# FILE: resource_processor.py import io import PyPDF2 import docx import logging from typing import Optional, Dict, List, Any import os logger = logging.getLogger(__name__) def _extract_text_from_pdf_bytes(file_bytes: bytes, filename: str) -> Optional[str]: # (Your full PDF extraction logic here) try: pdf_file = io.BytesIO(file_bytes) reader = PyPDF2.PdfReader(pdf_file) text = "\n".join(page.extract_text() for page in reader.pages if page.extract_text()) return text.strip() or None except Exception as e: logger.error(f"Error extracting PDF '{filename}': {e}") return None def _extract_text_from_docx_bytes(file_bytes: bytes, filename: str) -> Optional[str]: try: doc_file = io.BytesIO(file_bytes) document = docx.Document(doc_file) text = "\n".join([para.text for para in document.paragraphs if para.text.strip()]) full_text = text.strip() logger.info(f"Successfully extracted text from DOCX '{filename}'. Length: {len(full_text)} chars.") return full_text if full_text else None except Exception as e: logger.error(f"Error extracting text from DOCX '{filename}': {e}", exc_info=True) return None def _extract_text_from_txt_bytes(file_bytes: bytes, filename: str) -> Optional[str]: try: text = file_bytes.decode('utf-8') except UnicodeDecodeError: try: logger.warning(f"UTF-8 decoding failed for '{filename}', trying latin-1.") text = file_bytes.decode('latin-1') except UnicodeDecodeError: logger.error(f"Could not decode TXT file '{filename}' with UTF-8 or latin-1.") return None except Exception as e_latin1: logger.error(f"Error decoding TXT file '{filename}' with latin-1: {e_latin1}") return None except Exception as e: logger.error(f"Error reading TXT file '{filename}': {e}", exc_info=True) return None full_text = text.strip() logger.info(f"Successfully read text from TXT '{filename}'. Length: {len(full_text)} chars.") return full_text if full_text else None def process_uploaded_files(uploaded_files: Optional[List[Any]]) -> Optional[Dict[str, str]]: if not uploaded_files: return None print("Processing Here") extracted_texts = {} for uploaded_file in uploaded_files: filename = uploaded_file.name # In Gradio, .name is the path to a temporary file with open(filename, 'rb') as f: content_bytes = f.read() text_content = None if filename.lower().endswith('.pdf'): text_content = _extract_text_from_pdf_bytes(content_bytes, filename) elif filename.lower().endswith('.docx'): text_content = _extract_text_from_docx_bytes(content_bytes, filename) elif filename.lower().endswith('.txt'): text_content = _extract_text_from_txt_bytes(content_bytes, filename) if text_content: # We use the original filename (basename) as the key original_filename = os.path.basename(filename) extracted_texts[original_filename] = text_content print("Processing Done") return extracted_texts if extracted_texts else None