Spaces:
Build error
Build error
| # services/pdf_service.py | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional | |
| from PyPDF2 import PdfReader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| import faiss | |
| import numpy as np | |
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| import logging | |
| from datetime import datetime | |
| from config.config import settings | |
| logger = logging.getLogger(__name__) | |
| class PDFService: | |
| def __init__(self, model_service): | |
| self.embedder = model_service.embedder | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=settings.CHUNK_SIZE, | |
| chunk_overlap=settings.CHUNK_OVERLAP, | |
| separators=["\n\n", "\n", ".", "!", "?", ",", " ", ""] | |
| ) | |
| self.index = None | |
| self.chunks = [] | |
| self.last_update = None | |
| self.pdf_metadata = {} | |
| def process_pdf(self, pdf_path: Path) -> List[Dict[str, Any]]: | |
| """Process a single PDF file - now synchronous""" | |
| try: | |
| reader = PdfReader(str(pdf_path)) | |
| chunks = [] | |
| # Extract metadata | |
| metadata = { | |
| 'title': reader.metadata.get('/Title', ''), | |
| 'author': reader.metadata.get('/Author', ''), | |
| 'creation_date': reader.metadata.get('/CreationDate', ''), | |
| 'pages': len(reader.pages), | |
| 'filename': pdf_path.name | |
| } | |
| self.pdf_metadata[pdf_path.name] = metadata | |
| # Process each page | |
| for page_num, page in enumerate(reader.pages): | |
| text = page.extract_text() | |
| if not text: | |
| continue | |
| page_chunks = self.text_splitter.split_text(text) | |
| for i, chunk in enumerate(page_chunks): | |
| chunks.append({ | |
| 'text': chunk, | |
| 'source': pdf_path.name, | |
| 'page': page_num + 1, | |
| 'chunk_index': i, | |
| 'metadata': metadata, | |
| 'timestamp': datetime.now().isoformat() | |
| }) | |
| print("--------------------------- chunks ----------------------------------") | |
| print("--------------------------- chunks ----------------------------------") | |
| print(chunks) | |
| return chunks | |
| except Exception as e: | |
| logger.error(f"Error processing PDF {pdf_path}: {e}") | |
| return [] | |
| async def index_pdfs(self, pdf_folder: Path = settings.PDF_FOLDER) -> None: | |
| """Index all PDFs in the specified folder""" | |
| try: | |
| pdf_files = list(pdf_folder.glob('*.pdf')) | |
| if not pdf_files: | |
| logger.warning(f"No PDF files found in {pdf_folder}") | |
| return | |
| # Process PDFs using thread pool | |
| loop = asyncio.get_running_loop() | |
| with ThreadPoolExecutor() as executor: | |
| chunk_lists = await loop.run_in_executor( | |
| executor, | |
| lambda: [self.process_pdf(pdf_file) for pdf_file in pdf_files] | |
| ) | |
| # Combine all chunks | |
| self.chunks = [] | |
| for chunk_list in chunk_lists: | |
| self.chunks.extend(chunk_list) | |
| if not self.chunks: | |
| logger.warning("No text chunks extracted from PDFs") | |
| return | |
| # Create FAISS index | |
| texts = [chunk['text'] for chunk in self.chunks] | |
| embeddings = await loop.run_in_executor( | |
| None, | |
| lambda: self.embedder.encode( | |
| texts, | |
| convert_to_tensor=True, | |
| show_progress_bar=True | |
| ).cpu().detach().numpy() | |
| ) | |
| dimension = embeddings.shape[1] | |
| self.index = faiss.IndexFlatL2(dimension) | |
| self.index.add(embeddings) | |
| self.last_update = datetime.now() | |
| logger.info(f"Indexed {len(self.chunks)} chunks from {len(pdf_files)} PDFs") | |
| except Exception as e: | |
| logger.error(f"Error indexing PDFs: {e}") | |
| raise | |
| async def search( | |
| self, | |
| query: str, | |
| top_k: int = 5, | |
| min_score: float = 0.5 | |
| ) -> List[Dict[str, Any]]: | |
| """Search indexed PDFs with debug logs""" | |
| print("--------------------------- query ----------------------------------") | |
| print(query) | |
| if not self.index or not self.chunks: | |
| await self.index_pdfs() | |
| try: | |
| # Create query embedding | |
| query_embedding = self.embedder.encode([query], convert_to_tensor=True) | |
| query_embedding_np = query_embedding.cpu().detach().numpy() | |
| print("Query Embedding Shape:", query_embedding_np.shape) | |
| # Search in FAISS index | |
| distances, indices = self.index.search(query_embedding_np, top_k) | |
| print("Distances:", distances) | |
| print("Indices:", indices) | |
| # Process results | |
| results = [] | |
| for i, idx in enumerate(indices[0]): | |
| if idx >= len(self.chunks): | |
| continue # Skip invalid indices | |
| score = 1 - distances[0][i] # Convert distance to similarity score | |
| print(f"Chunk Index: {idx}, Distance: {distances[0][i]}, Score: {score}") | |
| print("----- score < min_score") | |
| print(score < min_score) | |
| if score < min_score: | |
| print("skipped ---- ") | |
| #continue # Skip low scores | |
| chunk = self.chunks[idx].copy() | |
| chunk['score'] = score | |
| print("---- chuck " ) | |
| print(chunk) | |
| results.append(chunk) | |
| # Sort by score and take top_k | |
| results.sort(key=lambda x: x['score'], reverse=True) | |
| print("--------------------------- results ----------------------------------") | |
| print(results) | |
| return results[:top_k] | |
| except Exception as e: | |
| logger.error(f"Error searching PDFs: {e}") | |
| raise | |