|
|
|
|
|
import io |
|
import PyPDF2 |
|
import docx |
|
import logging |
|
from typing import Optional, Dict, List, Any |
|
import os |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
def _extract_text_from_pdf_bytes(file_bytes: bytes, filename: str) -> Optional[str]: |
|
|
|
try: |
|
pdf_file = io.BytesIO(file_bytes) |
|
reader = PyPDF2.PdfReader(pdf_file) |
|
text = "\n".join(page.extract_text() for page in reader.pages if page.extract_text()) |
|
return text.strip() or None |
|
except Exception as e: |
|
logger.error(f"Error extracting PDF '{filename}': {e}") |
|
return None |
|
def _extract_text_from_docx_bytes(file_bytes: bytes, filename: str) -> Optional[str]: |
|
try: |
|
doc_file = io.BytesIO(file_bytes) |
|
document = docx.Document(doc_file) |
|
text = "\n".join([para.text for para in document.paragraphs if para.text.strip()]) |
|
full_text = text.strip() |
|
logger.info(f"Successfully extracted text from DOCX '{filename}'. Length: {len(full_text)} chars.") |
|
return full_text if full_text else None |
|
except Exception as e: |
|
logger.error(f"Error extracting text from DOCX '{filename}': {e}", exc_info=True) |
|
return None |
|
|
|
def _extract_text_from_txt_bytes(file_bytes: bytes, filename: str) -> Optional[str]: |
|
try: |
|
text = file_bytes.decode('utf-8') |
|
except UnicodeDecodeError: |
|
try: |
|
logger.warning(f"UTF-8 decoding failed for '{filename}', trying latin-1.") |
|
text = file_bytes.decode('latin-1') |
|
except UnicodeDecodeError: |
|
logger.error(f"Could not decode TXT file '{filename}' with UTF-8 or latin-1.") |
|
return None |
|
except Exception as e_latin1: |
|
logger.error(f"Error decoding TXT file '{filename}' with latin-1: {e_latin1}") |
|
return None |
|
except Exception as e: |
|
logger.error(f"Error reading TXT file '{filename}': {e}", exc_info=True) |
|
return None |
|
|
|
full_text = text.strip() |
|
logger.info(f"Successfully read text from TXT '{filename}'. Length: {len(full_text)} chars.") |
|
return full_text if full_text else None |
|
|
|
|
|
def process_uploaded_files(uploaded_files: Optional[List[Any]]) -> Optional[Dict[str, str]]: |
|
if not uploaded_files: |
|
return None |
|
print("Processing Here") |
|
extracted_texts = {} |
|
for uploaded_file in uploaded_files: |
|
filename = uploaded_file.name |
|
|
|
with open(filename, 'rb') as f: |
|
content_bytes = f.read() |
|
|
|
text_content = None |
|
if filename.lower().endswith('.pdf'): |
|
text_content = _extract_text_from_pdf_bytes(content_bytes, filename) |
|
elif filename.lower().endswith('.docx'): |
|
text_content = _extract_text_from_docx_bytes(content_bytes, filename) |
|
elif filename.lower().endswith('.txt'): |
|
text_content = _extract_text_from_txt_bytes(content_bytes, filename) |
|
|
|
if text_content: |
|
|
|
original_filename = os.path.basename(filename) |
|
extracted_texts[original_filename] = text_content |
|
|
|
print("Processing Done") |
|
|
|
return extracted_texts if extracted_texts else None |