import os import uuid import shutil import logging from typing import List, Optional, Dict, Any from pathlib import Path from langchain.schema import Document as LangchainDocument from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.embeddings import HuggingFaceEmbeddings from langchain.vectorstores import FAISS import fitz # PyMuPDF import markdown from fastapi import FastAPI, File, UploadFile, HTTPException, Form, Depends, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel from dotenv import load_dotenv from openrouter_llm import OpenRouterFreeAdapter, OpenRouterFreeChain # Load environment variables load_dotenv() # Import LangChain components for embedding # Import our free-only OpenRouter adapter # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastAPI app app = FastAPI(title="AskMyDocs API - Free LLM Edition") # Add CORS middleware for frontend integration app.add_middleware( CORSMiddleware, allow_origins=["*"], # Set to specific domain in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Configuration OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") HF_MODEL_NAME = os.getenv( "HF_MODEL_NAME", "sentence-transformers/all-mpnet-base-v2") UPLOAD_DIR = os.getenv("UPLOAD_DIR", "./uploads") DB_DIR = os.getenv("DB_DIR", "./vectordb") print(HF_MODEL_NAME) # Ensure directories exist os.makedirs(UPLOAD_DIR, exist_ok=True) os.makedirs(DB_DIR, exist_ok=True) # Initialize OpenRouter adapter (singleton) openrouter_adapter = None # Pydantic models class QueryRequest(BaseModel): query: str collection_id: str class QueryResponse(BaseModel): answer: str sources: List[str] class Document(BaseModel): id: str filename: str content_type: str class DocumentList(BaseModel): documents: List[Document] class LLMInfo(BaseModel): model: str is_free: bool = True provider: str = "openrouter" class LLMModelsList(BaseModel): current_model: str free_models: List[Dict[str, Any]] # Global variable to store vector databases (in memory for simplicity) # In production, you would use persistent storage vector_dbs = {} # Helper functions def get_embeddings(): """Get HuggingFace embedding model.""" return HuggingFaceEmbeddings(model_name=HF_MODEL_NAME) def get_openrouter_adapter(): """Get or initialize the OpenRouter adapter for free models.""" global openrouter_adapter if openrouter_adapter is None: openrouter_adapter = OpenRouterFreeAdapter(api_key=OPENROUTER_API_KEY) return openrouter_adapter def extract_text_from_pdf(file_path): """Extract text content from PDF files.""" text = "" try: doc = fitz.open(file_path) for page in doc: text += page.get_text() return text except Exception as e: logger.error(f"Error extracting text from PDF: {e}") raise HTTPException( status_code=500, detail=f"Error processing PDF: {str(e)}") def extract_text_from_markdown(file_path): """Convert Markdown to plain text.""" try: with open(file_path, 'r', encoding='utf-8') as f: md_content = f.read() html = markdown.markdown(md_content) # Simple HTML to text conversion - in production use a more robust method text = html.replace('

', '\n\n').replace( '

', '').replace('
', '\n') text = text.replace('

', '\n\n# ').replace('

', '\n') text = text.replace('

', '\n\n## ').replace('

', '\n') text = text.replace('

', '\n\n### ').replace('

', '\n') # Remove other HTML tags import re text = re.sub('<[^<]+?>', '', text) return text except Exception as e: logger.error(f"Error processing Markdown: {e}") raise HTTPException( status_code=500, detail=f"Error processing Markdown: {str(e)}") def extract_text_from_file(file_path, content_type): """Extract text based on file type.""" if content_type == "application/pdf": return extract_text_from_pdf(file_path) elif content_type == "text/markdown": return extract_text_from_markdown(file_path) elif content_type == "text/plain": with open(file_path, 'r', encoding='utf-8') as f: return f.read() else: raise HTTPException( status_code=400, detail=f"Unsupported file type: {content_type}") def process_documents(collection_id: str, file_paths: List[tuple]): """Process documents and create vector store.""" try: # Create text splitter text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=100, length_function=len, ) all_docs = [] for file_path, content_type, filename in file_paths: text_content = extract_text_from_file(file_path, content_type) chunks = text_splitter.split_text(text_content) # Create Document objects with metadata docs = [ LangchainDocument( page_content=chunk, metadata={"source": filename, "chunk": i} ) for i, chunk in enumerate(chunks) ] all_docs.extend(docs) # Create vector store embeddings = get_embeddings() vector_db = FAISS.from_documents(all_docs, embeddings) # Save vector store collection_path = os.path.join(DB_DIR, collection_id) os.makedirs(collection_path, exist_ok=True) vector_db.save_local(collection_path) # Store in memory (would be replaced by database lookup in production) vector_dbs[collection_id] = vector_db logger.info( f"Successfully processed {len(all_docs)} chunks from {len(file_paths)} documents") except Exception as e: logger.error(f"Error processing documents: {e}") raise HTTPException( status_code=500, detail=f"Error processing documents: {str(e)}") @app.get("/") async def index(): return {"message": "Welcome to ask my doc"} @app.get("/health") async def health_check(): return {"status": "healthy"} @app.post("/upload", response_model=Document) async def upload_file( background_tasks: BackgroundTasks, collection_id: str = Form(...), file: UploadFile = File(...), ): """Upload a document and process it for querying.""" try: # Generate a unique ID for the document doc_id = str(uuid.uuid4()) # Create collection directory if it doesn't exist collection_dir = os.path.join(UPLOAD_DIR, collection_id) os.makedirs(collection_dir, exist_ok=True) # Define the file path file_path = os.path.join(collection_dir, file.filename) # Determine content type content_type = file.content_type if not content_type: if file.filename.endswith('.pdf'): content_type = "application/pdf" elif file.filename.endswith('.md'): content_type = "text/markdown" elif file.filename.endswith('.txt'): content_type = "text/plain" else: raise HTTPException( status_code=400, detail="Unsupported file type") # Save the file with open(file_path, "wb") as f: shutil.copyfileobj(file.file, f) # Process the document in the background background_tasks.add_task( process_documents, collection_id, [(file_path, content_type, file.filename)] ) return Document( id=doc_id, filename=file.filename, content_type=content_type ) except Exception as e: logger.error(f"Error uploading file: {e}") raise HTTPException( status_code=500, detail=f"Error uploading file: {str(e)}") @app.get("/collections/{collection_id}/documents", response_model=DocumentList) async def list_documents(collection_id: str): """List all documents in a collection.""" try: collection_dir = os.path.join(UPLOAD_DIR, collection_id) if not os.path.exists(collection_dir): return DocumentList(documents=[]) documents = [] for filename in os.listdir(collection_dir): file_path = os.path.join(collection_dir, filename) if os.path.isfile(file_path): content_type = "application/octet-stream" if filename.endswith('.pdf'): content_type = "application/pdf" elif filename.endswith('.md'): content_type = "text/markdown" elif filename.endswith('.txt'): content_type = "text/plain" documents.append(Document( # In production, store and retrieve actual IDs id=str(uuid.uuid4()), filename=filename, content_type=content_type )) return DocumentList(documents=documents) except Exception as e: logger.error(f"Error listing documents: {e}") raise HTTPException( status_code=500, detail=f"Error listing documents: {str(e)}") @app.post("/query", response_model=QueryResponse) async def query_documents(request: QueryRequest): """Query documents using natural language.""" try: collection_id = request.collection_id # Check if vector DB exists in memory if collection_id in vector_dbs: vector_db = vector_dbs[collection_id] else: # Load from disk collection_path = os.path.join(DB_DIR, collection_id) if not os.path.exists(collection_path): raise HTTPException( status_code=404, detail=f"Collection {collection_id} not found") embeddings = get_embeddings() vector_db = FAISS.load_local(collection_path, embeddings) vector_dbs[collection_id] = vector_db # Get the retriever retriever = vector_db.as_retriever(search_kwargs={"k": 3}) # Get relevant documents docs = retriever.get_relevant_documents(request.query) # Extract sources sources = [] for doc in docs: if doc.metadata.get("source") not in sources: sources.append(doc.metadata.get("source")) # Get context from documents context = [doc.page_content for doc in docs] # Get OpenRouter adapter for free LLMs adapter = get_openrouter_adapter() chain = OpenRouterFreeChain(adapter) # Generate answer answer = chain.run(request.query, context) return QueryResponse( answer=answer, sources=sources ) except Exception as e: logger.error(f"Error querying documents: {e}") raise HTTPException( status_code=500, detail=f"Error querying documents: {str(e)}") @app.delete("/collections/{collection_id}/documents/{filename}") async def delete_document(collection_id: str, filename: str): """Delete a document from a collection.""" try: file_path = os.path.join(UPLOAD_DIR, collection_id, filename) if not os.path.exists(file_path): raise HTTPException( status_code=404, detail=f"Document {filename} not found") os.remove(file_path) # Rebuild vector store if needed collection_path = os.path.join(DB_DIR, collection_id) if os.path.exists(collection_path): # In production, you would selectively remove documents rather than rebuilding shutil.rmtree(collection_path) # If there are still documents, rebuild the vector store collection_dir = os.path.join(UPLOAD_DIR, collection_id) if os.path.exists(collection_dir) and os.listdir(collection_dir): file_paths = [] for fname in os.listdir(collection_dir): fpath = os.path.join(collection_dir, fname) if os.path.isfile(fpath): content_type = "application/octet-stream" if fname.endswith('.pdf'): content_type = "application/pdf" elif fname.endswith('.md'): content_type = "text/markdown" elif fname.endswith('.txt'): content_type = "text/plain" file_paths.append((fpath, content_type, fname)) if file_paths: process_documents(collection_id, file_paths) # Remove from in-memory cache if collection_id in vector_dbs: del vector_dbs[collection_id] return JSONResponse(content={"message": f"Document {filename} deleted"}) except Exception as e: logger.error(f"Error deleting document: {e}") raise HTTPException( status_code=500, detail=f"Error deleting document: {str(e)}") @app.get("/llm/info", response_model=LLMInfo) async def get_llm_info(): """Get the current LLM information.""" adapter = get_openrouter_adapter() return LLMInfo( model=adapter.model, is_free=True, provider="openrouter" ) @app.get("/llm/models", response_model=LLMModelsList) async def list_free_models(): """List all available free models.""" adapter = get_openrouter_adapter() free_models = adapter.list_free_models() # Create a simplified list for the frontend model_list = [] for model in free_models: model_info = { "id": model.get("id"), "name": model.get("name", model.get("id")), "context_length": model.get("context_length", 4096), "provider": model.get("id").split("/")[0] if "/" in model.get("id") else "unknown" } model_list.append(model_info) return LLMModelsList( current_model=adapter.model, free_models=model_list ) @app.post("/llm/change-model") async def change_model(model_info: LLMInfo): """Change the LLM model (only to another free model).""" adapter = get_openrouter_adapter() # Make sure the model has the :free suffix if it doesn't already model_id = model_info.model if not model_id.endswith(":free") and ":free" not in model_id: model_id = f"{model_id}:free" # Set the new model adapter.model = model_id return JSONResponse(content={"message": f"Model changed to {model_id}"}) if __name__ == "__main__": import uvicorn # Check if we have an OpenRouter adapter and initialize it adapter = get_openrouter_adapter() logger.info(f"Starting AskMyDocs with free model: {adapter.model}") uvicorn.run(app, host="0.0.0.0", port=7860)