|
|
|
import os |
|
import math |
|
import sqlite3 |
|
import fitz |
|
import re |
|
|
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
from langgraph.graph import START, StateGraph, MessagesState, END |
|
from langgraph.prebuilt import tools_condition |
|
from langgraph.prebuilt import ToolNode |
|
from langgraph.constants import START |
|
from langchain_core.tools import tool |
|
from langchain.schema import SystemMessage |
|
|
|
|
|
|
|
from langchain.embeddings import HuggingFaceEmbeddings |
|
|
|
from langchain.tools.retriever import create_retriever_tool |
|
|
|
|
|
|
|
|
|
from langchain.embeddings import HuggingFaceEmbeddings |
|
from langchain_community.vectorstores.pinecone import Pinecone as LC_Pinecone |
|
|
|
|
|
|
|
from langchain.chat_models import ChatOpenAI |
|
from langchain_groq import ChatGroq |
|
from langchain_mistralai import ChatMistralAI |
|
from langchain.agents import initialize_agent, AgentType |
|
from langchain.schema import Document |
|
from langchain.chains import RetrievalQA |
|
from langchain.embeddings import OpenAIEmbeddings |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from langchain.vectorstores import FAISS |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain.prompts import PromptTemplate |
|
from langchain_community.document_loaders import TextLoader, PyMuPDFLoader |
|
from langchain_community.document_loaders.wikipedia import WikipediaLoader |
|
from langchain_community.document_loaders.arxiv import ArxivLoader |
|
from langchain_experimental.tools.python.tool import PythonREPLTool |
|
|
|
|
|
|
|
from langchain.agents import initialize_agent, AgentType |
|
from langchain.tools import Tool |
|
from typing import List, Callable |
|
from langchain.schema import BaseMemory, AIMessage, HumanMessage, SystemMessage |
|
from langchain.schema import HumanMessage, SystemMessage |
|
from langchain.llms.base import LLM |
|
from langchain.memory.chat_memory import BaseChatMemory |
|
from pydantic import PrivateAttr |
|
from langchain_core.messages import get_buffer_string |
|
|
|
|
|
|
|
from PIL import Image |
|
import pytesseract |
|
from transformers import pipeline |
|
from groq import Groq |
|
import requests |
|
from io import BytesIO |
|
from transformers import pipeline, TrOCRProcessor, VisionEncoderDecoderModel |
|
import requests |
|
import base64 |
|
from PIL import UnidentifiedImageError |
|
|
|
|
|
from typing import List, Dict |
|
import json |
|
from io import BytesIO |
|
|
|
from playwright.sync_api import sync_playwright |
|
from duckduckgo_search import DDGS |
|
import time |
|
import random |
|
import logging |
|
from functools import lru_cache, wraps |
|
import requests |
|
from playwright.sync_api import sync_playwright |
|
from bs4 import BeautifulSoup |
|
import tenacity |
|
from tenacity import retry, stop_after_attempt, wait_exponential |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
|
|
|
|
import pandas as pd |
|
from PyPDF2 import PdfReader |
|
import docx |
|
import pytesseract |
|
import speech_recognition as sr |
|
from pydub import AudioSegment |
|
from pytube import YouTube |
|
from newspaper import Article |
|
from langchain.document_loaders import ArxivLoader |
|
from langchain_community.document_loaders.youtube import YoutubeLoader, TranscriptFormat |
|
|
|
from playwright.sync_api import sync_playwright |
|
|
|
try: |
|
from playwright.sync_api import sync_playwright |
|
_playwright_available = True |
|
except ImportError: |
|
_playwright_available = False |
|
|
|
|
|
_forbidden = ["porn", "sex", "xxx", "nude", "erotic"] |
|
|
|
|
|
|
|
|
|
|
|
os.environ.setdefault("OPENAI_API_KEY", "<YOUR_OPENAI_KEY>") |
|
os.environ["GROQ_API_KEY"] = os.getenv("GROQ_API_KEY", "default_key_or_placeholder") |
|
os.environ["MISTRAL_API_KEY"] = os.getenv("MISTRAL_API_KEY", "default_key_or_placeholder") |
|
|
|
|
|
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "default_key_or_placeholder") |
|
_forbidden = ["nsfw", "porn", "sex", "explicit"] |
|
_playwright_available = True |
|
|
|
|
|
vector_store = None |
|
rag_chain = None |
|
DB_PATH = None |
|
DOC_PATH = None |
|
IMG_PATH = None |
|
OTH_PATH = None |
|
|
|
|
|
|
|
|
|
from tenacity import retry, stop_after_attempt, wait_exponential |
|
|
|
|
|
from retry_groq import RetryingChatGroq |
|
|
|
|
|
llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False, temperature=0) |
|
|
|
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
def multiply(a: int, b: int) -> int: |
|
""" |
|
Multiply two numbers. |
|
|
|
Args: |
|
a (int): The first factor. |
|
b (int): The second factor. |
|
|
|
Returns: |
|
int: The product of a and b. |
|
""" |
|
try: |
|
|
|
result = a * b |
|
return result |
|
except Exception as e: |
|
return f"Error in multiplication: {str(e)}" |
|
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
def add(a: int, b: int) -> int: |
|
""" |
|
Add two numbers. |
|
|
|
Args: |
|
a (int): The first factor. |
|
b (int): The second factor. |
|
|
|
Returns: |
|
int: The addition of a and b. |
|
""" |
|
try: |
|
|
|
result = a + b |
|
return result |
|
except Exception as e: |
|
return f"Error in addition: {str(e)}" |
|
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
def subtract(a: int, b: int) -> int: |
|
""" |
|
Subtract two numbers. |
|
|
|
Args: |
|
a (int): The first factor. |
|
b (int): The second factor. |
|
|
|
Returns: |
|
int: The subtraction of a and b. |
|
""" |
|
try: |
|
|
|
result = a - b |
|
return result |
|
except Exception as e: |
|
return f"Error in subtraction: {str(e)}" |
|
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
def divide(a: int, b: int) -> int: |
|
""" |
|
Divide two numbers. |
|
|
|
Args: |
|
a (int): The numerator. |
|
b (int): The denominator. |
|
|
|
Returns: |
|
float: The result of a divided by b. |
|
|
|
Raises: |
|
ValueError: If b is zero. |
|
""" |
|
try: |
|
if b == 0: |
|
return "Error: Cannot divide by zero." |
|
|
|
result = a / b |
|
return result |
|
except Exception as e: |
|
return f"Error in division: {str(e)}" |
|
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
def modulus(a: int, b: int) -> int: |
|
""" |
|
Get the modulus (remainder) of two numbers. |
|
|
|
Args: |
|
a (int): The dividend. |
|
b (int): The divisor. |
|
|
|
Returns: |
|
int: The remainder when a is divided by b. |
|
""" |
|
try: |
|
if b == 0: |
|
return "Error: Cannot calculate modulus with zero divisor." |
|
|
|
result = a % b |
|
return result |
|
except Exception as e: |
|
return f"Error in modulus calculation: {str(e)}" |
|
|
|
|
|
|
|
|
|
def with_retry(max_attempts: int = 3, backoff_base: int = 2): |
|
""" |
|
Decorator for retrying a function with exponential backoff on exception. |
|
""" |
|
def decorator(fn): |
|
@wraps(fn) |
|
def wrapper(*args, **kwargs): |
|
for attempt in range(max_attempts): |
|
try: |
|
return fn(*args, **kwargs) |
|
except Exception as e: |
|
wait = backoff_base ** attempt + random.uniform(0, 1) |
|
logger.warning(f"{fn.__name__} failed (attempt {attempt+1}/{max_attempts}): {e}") |
|
if attempt < max_attempts - 1: |
|
time.sleep(wait) |
|
logger.error(f"{fn.__name__} failed after {max_attempts} attempts.") |
|
return [] |
|
return wrapper |
|
return decorator |
|
|
|
@with_retry() |
|
@lru_cache(maxsize=128) |
|
def tavily_search(query: str, top_k: int = 3) -> List[Dict]: |
|
"""Call Tavily API and return a list of result dicts.""" |
|
if not TAVILY_API_KEY: |
|
logger.info("[Tavily] No API key set. Skipping Tavily search.") |
|
return [] |
|
url = "https://api.tavily.com/search" |
|
headers = { |
|
"Authorization": f"Bearer {TAVILY_API_KEY}", |
|
"Content-Type": "application/json", |
|
} |
|
payload = {"query": query, "num_results": top_k} |
|
resp = requests.post(url, headers=headers, json=payload, timeout=10) |
|
resp.raise_for_status() |
|
data = resp.json() |
|
results = [] |
|
for item in data.get("results", []): |
|
results.append({ |
|
"title": item.get("title", ""), |
|
"url": item.get("url", ""), |
|
"content": item.get("content", "")[:200], |
|
"source": "Tavily" |
|
}) |
|
return results |
|
|
|
@with_retry() |
|
@lru_cache(maxsize=128) |
|
def duckduckgo_search(query: str, top_k: int = 3) -> List[Dict]: |
|
"""Query DuckDuckGo and return up to top_k raw SERP hits.""" |
|
results = [] |
|
try: |
|
with DDGS(timeout=15) as ddgs: |
|
for hit in ddgs.text(query, safesearch="On", max_results=top_k, timeout=15): |
|
results.append({ |
|
"title": hit.get("title", ""), |
|
"url": hit.get("href") or hit.get("url", ""), |
|
"content": hit.get("body", ""), |
|
"source": "DuckDuckGo" |
|
}) |
|
if len(results) >= top_k: |
|
break |
|
except Exception as e: |
|
logger.warning(f"DuckDuckGo search failed: {e}") |
|
|
|
|
|
return results |
|
|
|
|
|
def simple_google_search(query: str, top_k: int = 3) -> List[Dict]: |
|
"""Simplified Google search as a fallback when other methods fail.""" |
|
try: |
|
|
|
import urllib.parse |
|
import bs4 |
|
|
|
encoded_query = urllib.parse.quote(query) |
|
url = f"https://www.google.com/search?q={encoded_query}" |
|
|
|
headers = { |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", |
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
|
"Accept-Language": "en-US,en;q=0.5", |
|
"Referer": "https://www.google.com/", |
|
"Connection": "keep-alive", |
|
} |
|
|
|
response = requests.get(url, headers=headers, timeout=20) |
|
response.raise_for_status() |
|
|
|
soup = bs4.BeautifulSoup(response.text, "html.parser") |
|
results = [] |
|
|
|
|
|
for result in soup.select("div.g")[:top_k]: |
|
title_elem = result.select_one("h3") |
|
link_elem = result.select_one("a") |
|
snippet_elem = result.select_one("div.VwiC3b") |
|
|
|
if title_elem and link_elem and snippet_elem and "href" in link_elem.attrs: |
|
href = link_elem["href"] |
|
if href.startswith("/url?q="): |
|
href = href.split("/url?q=")[1].split("&")[0] |
|
|
|
if href.startswith("http"): |
|
results.append({ |
|
"title": title_elem.get_text(), |
|
"url": href, |
|
"content": snippet_elem.get_text(), |
|
"source": "Google" |
|
}) |
|
|
|
return results |
|
|
|
except Exception as e: |
|
logger.warning(f"Simple Google search failed: {e}") |
|
return [] |
|
|
|
def hybrid_search(query: str, top_k: int = 3) -> List[Dict]: |
|
"""Combine multiple search sources with fallbacks.""" |
|
|
|
results = [] |
|
|
|
|
|
if TAVILY_API_KEY and TAVILY_API_KEY != "default_key_or_placeholder": |
|
try: |
|
tavily_results = tavily_search(query, top_k) |
|
results.extend(tavily_results) |
|
logger.info(f"Retrieved {len(tavily_results)} results from Tavily") |
|
except Exception as e: |
|
logger.warning(f"Tavily search failed: {e}") |
|
|
|
|
|
if len(results) < top_k: |
|
try: |
|
ddg_results = duckduckgo_search(query, top_k - len(results)) |
|
results.extend(ddg_results) |
|
logger.info(f"Retrieved {len(ddg_results)} results from DuckDuckGo") |
|
except Exception as e: |
|
logger.warning(f"DuckDuckGo search failed: {e}") |
|
|
|
|
|
if len(results) < top_k: |
|
try: |
|
google_results = simple_google_search(query, top_k - len(results)) |
|
results.extend(google_results) |
|
logger.info(f"Retrieved {len(google_results)} results from Google") |
|
except Exception as e: |
|
logger.warning(f"Google search failed: {e}") |
|
|
|
|
|
if not results: |
|
results.append({ |
|
"title": "Search Failed", |
|
"url": "", |
|
"content": f"Sorry, I couldn't find results for '{query}'. Please try refining your search terms or check your internet connection.", |
|
"source": "No results" |
|
}) |
|
|
|
return results[:top_k] |
|
|
|
def format_search_docs(search_docs: List[Dict]) -> Dict[str, str]: |
|
""" |
|
Turn a list of {source, page, content} dicts into one big |
|
string with <Document ...>β¦</Document> entries separated by `---`. |
|
""" |
|
formatted_search_docs = "\n\n---\n\n".join( |
|
[ |
|
f'<Document source="{doc["source"]}" page="{doc.get("page", "")}"/>\n' |
|
f'{doc.get("content", "")}\n' |
|
f'</Document>' |
|
for doc in search_docs |
|
] |
|
) |
|
return {"web_results": formatted_search_docs} |
|
|
|
|
|
@tool(parse_docstring=True) |
|
def web_search(query: str, top_k: int = 3) -> Dict[str, str]: |
|
""" |
|
Perform a hybrid web search combining multiple search engines with robust fallbacks. |
|
|
|
Args: |
|
query: The search query string to look up. |
|
top_k: The maximum number of search results to return (default is 3). |
|
|
|
Returns: |
|
A dictionary mapping result indices to XML-like <Document> blocks, each containing: |
|
- source: The URL of the webpage. |
|
- page: Placeholder for page identifier (empty string by default). |
|
- content: The first 200 words of the page text, cleaned of HTML tags. |
|
""" |
|
try: |
|
|
|
search_results = hybrid_search(query, top_k) |
|
results = [] |
|
|
|
|
|
for hit in search_results: |
|
url = hit.get("url") |
|
if not url: |
|
continue |
|
|
|
|
|
content = hit.get("content", "") |
|
title = hit.get("title", "") |
|
|
|
|
|
try: |
|
|
|
headers = { |
|
"User-Agent": random.choice([ |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15", |
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36", |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36 Edg/97.0.1072.62" |
|
]), |
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
|
"Accept-Language": "en-US,en;q=0.5", |
|
"Referer": "https://www.google.com/", |
|
"DNT": "1", |
|
"Connection": "keep-alive" |
|
} |
|
|
|
|
|
resp = requests.get(url, timeout=15, headers=headers) |
|
|
|
|
|
if resp.status_code == 200: |
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
|
|
|
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content') |
|
|
|
|
|
if main_content: |
|
extracted_text = main_content.get_text(separator=" ", strip=True) |
|
|
|
content = " ".join(extracted_text.split()[:200]) |
|
else: |
|
|
|
all_text = soup.get_text(separator=" ", strip=True) |
|
content = " ".join(all_text.split()[:200]) |
|
|
|
|
|
if len(content) < 50: |
|
content = hit.get("content", "")[:200] |
|
|
|
|
|
time.sleep(0.5 + random.random()) |
|
|
|
except requests.exceptions.HTTPError as e: |
|
logger.warning(f"HTTP error when scraping {url}: {e}") |
|
|
|
except requests.exceptions.RequestException as e: |
|
logger.warning(f"Request error when scraping {url}: {e}") |
|
|
|
except Exception as e: |
|
logger.warning(f"Unexpected error when scraping {url}: {e}") |
|
|
|
|
|
|
|
if any(f in content.lower() for f in _forbidden): |
|
continue |
|
|
|
|
|
results.append({ |
|
"source": url, |
|
"page": "", |
|
"content": content |
|
}) |
|
|
|
|
|
return format_search_docs(results[:top_k]) |
|
except Exception as e: |
|
logger.error(f"Web search failed: {e}") |
|
|
|
return format_search_docs([{ |
|
"source": "Error", |
|
"page": "", |
|
"content": f"Search failed with error: {e}. Please try again with different search terms." |
|
}]) |
|
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
def download_file(url: str, dest_path: str) -> str: |
|
""" |
|
Download a file from a given URL and save it locally. |
|
|
|
Args: |
|
url: The direct URL of the file to download. |
|
dest_path: The local path to save the downloaded file. |
|
|
|
Returns: |
|
The destination path where the file was saved. |
|
""" |
|
r = requests.get(url, stream=True) |
|
r.raise_for_status() |
|
with open(dest_path, 'wb') as f: |
|
for chunk in r.iter_content(8192): |
|
f.write(chunk) |
|
return dest_path |
|
|
|
@tool(parse_docstring=True) |
|
def process_excel_to_text(file_path: str) -> str: |
|
""" |
|
Convert an Excel file into CSV-formatted text. |
|
|
|
Args: |
|
file_path: Path to the Excel (.xlsx) file. |
|
|
|
Returns: |
|
A string of CSV-formatted content extracted from the Excel file. |
|
""" |
|
try: |
|
|
|
import os |
|
if not os.path.exists(file_path): |
|
return f"Error: Excel file '{file_path}' does not exist." |
|
|
|
|
|
engines = ['openpyxl', 'xlrd', None] |
|
|
|
for engine in engines: |
|
try: |
|
|
|
if engine: |
|
df = pd.read_excel(file_path, engine=engine) |
|
else: |
|
df = pd.read_excel(file_path) |
|
return df.to_csv(index=False) |
|
except Exception as e: |
|
print(f"Excel engine {engine} failed: {e}") |
|
last_error = e |
|
continue |
|
|
|
|
|
return f"Error processing Excel file: {str(last_error)}" |
|
except Exception as e: |
|
return f"Error with Excel file: {str(e)}" |
|
|
|
@tool(parse_docstring=True) |
|
def read_text_from_pdf(file_path: str, question: str = None) -> str: |
|
""" |
|
Extract text from a PDF file, chunking large documents if needed. |
|
|
|
Args: |
|
file_path: Path to the PDF file. |
|
question: Optional question to help retrieve relevant parts of long documents. |
|
|
|
Returns: |
|
The extracted text content, potentially chunked if the document is large. |
|
""" |
|
try: |
|
|
|
import os |
|
if not os.path.exists(file_path): |
|
return f"Error: PDF file '{file_path}' does not exist." |
|
|
|
reader = PdfReader(file_path) |
|
full_text = "\n".join([page.extract_text() or "" for page in reader.pages]) |
|
|
|
|
|
if question and len(full_text) > 5000: |
|
return process_large_document(full_text, question) |
|
|
|
return full_text |
|
except Exception as e: |
|
return f"Error reading PDF: {str(e)}" |
|
|
|
@tool(parse_docstring=True) |
|
def read_text_from_docx(file_path: str, question: str = None) -> str: |
|
""" |
|
Extract text from a DOCX (Word) document, chunking large documents if needed. |
|
|
|
Args: |
|
file_path: Path to the DOCX file. |
|
question: Optional question to help retrieve relevant parts of long documents. |
|
|
|
Returns: |
|
The extracted text, potentially chunked if the document is large. |
|
""" |
|
try: |
|
|
|
import os |
|
if not os.path.exists(file_path): |
|
return f"Error: File '{file_path}' does not exist." |
|
|
|
try: |
|
doc = docx.Document(file_path) |
|
full_text = "\n".join([para.text for para in doc.paragraphs]) |
|
except Exception as docx_err: |
|
|
|
if "Package not found" in str(docx_err): |
|
|
|
try: |
|
import zipfile |
|
from xml.etree.ElementTree import XML |
|
|
|
WORD_NAMESPACE = '{http://schemas.openxmlformats.org/wordprocessingml/2006/main}' |
|
PARA = WORD_NAMESPACE + 'p' |
|
TEXT = WORD_NAMESPACE + 't' |
|
|
|
with zipfile.ZipFile(file_path) as docx_file: |
|
with docx_file.open('word/document.xml') as document: |
|
tree = XML(document.read()) |
|
paragraphs = [] |
|
for paragraph in tree.iter(PARA): |
|
texts = [node.text for node in paragraph.iter(TEXT) if node.text] |
|
if texts: |
|
paragraphs.append(''.join(texts)) |
|
full_text = '\n'.join(paragraphs) |
|
except Exception as e: |
|
return f"Error reading DOCX file: {str(e)}" |
|
else: |
|
return f"Error reading DOCX file: {str(docx_err)}" |
|
|
|
|
|
if question and len(full_text) > 5000: |
|
return process_large_document(full_text, question) |
|
|
|
return full_text |
|
except Exception as e: |
|
return f"Error reading DOCX file: {str(e)}" |
|
|
|
|
|
@tool(parse_docstring=True) |
|
def transcribe_audio(file_path: str) -> str: |
|
""" |
|
Transcribe speech from a local audio file to text. |
|
|
|
Args: |
|
file_path: Path to the audio file. |
|
|
|
Returns: |
|
Transcribed text using Google Web Speech API. |
|
""" |
|
try: |
|
|
|
import os |
|
if not os.path.exists(file_path): |
|
return f"Error: Audio file '{file_path}' does not exist." |
|
|
|
|
|
if not file_path.lower().endswith('.wav'): |
|
try: |
|
from pydub import AudioSegment |
|
temp_wav = os.path.splitext(file_path)[0] + "_temp.wav" |
|
audio = AudioSegment.from_file(file_path) |
|
audio.export(temp_wav, format="wav") |
|
file_path = temp_wav |
|
except Exception as e: |
|
return f"Failed to convert audio to WAV format: {str(e)}" |
|
|
|
recognizer = sr.Recognizer() |
|
with sr.AudioFile(file_path) as src: |
|
audio = recognizer.record(src) |
|
return recognizer.recognize_google(audio) |
|
except Exception as e: |
|
if "Audio file could not be read" in str(e): |
|
return f"Error: Audio format not supported. Try converting to WAV, MP3, OGG, or FLAC." |
|
return f"Error transcribing audio: {str(e)}" |
|
|
|
@tool(parse_docstring=True) |
|
def youtube_audio_processing(youtube_url: str) -> str: |
|
""" |
|
Download and transcribe audio from a YouTube video. |
|
|
|
Args: |
|
youtube_url: URL of the YouTube video. |
|
|
|
Returns: |
|
Transcription text extracted from the video's audio. |
|
""" |
|
yt = YouTube(youtube_url) |
|
audio_stream = yt.streams.filter(only_audio=True).first() |
|
out_file = audio_stream.download(output_path='.', filename='yt_audio') |
|
wav_path = 'yt_audio.wav' |
|
AudioSegment.from_file(out_file).export(wav_path, format='wav') |
|
return transcribe_audio(wav_path) |
|
|
|
@tool(parse_docstring=True) |
|
def extract_article_text(url: str, question: str = None) -> str: |
|
""" |
|
Download and extract the main article content from a webpage, chunking large articles if needed. |
|
|
|
Args: |
|
url: The URL of the article to extract. |
|
question: Optional question to help retrieve relevant parts of long articles. |
|
|
|
Returns: |
|
The article's textual content, potentially chunked if large. |
|
""" |
|
try: |
|
art = Article(url) |
|
art.download() |
|
art.parse() |
|
full_text = art.text |
|
|
|
|
|
if question and len(full_text) > 5000: |
|
return process_large_document(full_text, question) |
|
|
|
return full_text |
|
except Exception as e: |
|
return f"Error extracting article: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
def arvix_search(query: str) -> Dict[str, str]: |
|
""" |
|
Search for academic papers on ArXiv. |
|
|
|
Args: |
|
query: The search term to look for in ArXiv. |
|
|
|
Returns: |
|
A dictionary of up to 3 relevant paper entries in JSON format. |
|
""" |
|
papers = ArxivLoader(query=query, load_max_docs=3).load() |
|
results = [] |
|
for doc in papers: |
|
try: |
|
|
|
source = doc.metadata.get("source", "ArXiv") |
|
doc_id = doc.metadata.get("id", doc.metadata.get("entry_id", "")) |
|
result = { |
|
"source": source, |
|
"id": doc_id, |
|
"summary": doc.page_content[:1000] if hasattr(doc, "page_content") else str(doc)[:1000], |
|
} |
|
results.append(result) |
|
except Exception as e: |
|
|
|
results.append({ |
|
"source": "ArXiv Error", |
|
"id": "error", |
|
"summary": f"Error processing paper: {str(e)}" |
|
}) |
|
|
|
return {"arvix_results": json.dumps(results)} |
|
|
|
@tool(parse_docstring=True) |
|
def answer_youtube_video_question( |
|
youtube_url: str, |
|
question: str, |
|
chunk_size_seconds: int = 30 |
|
) -> str: |
|
""" |
|
Answer a question based on a YouTube video's transcript. |
|
|
|
Args: |
|
youtube_url: URL of the YouTube video. |
|
question: The question to be answered using video content. |
|
chunk_size_seconds: Duration of each transcript chunk. |
|
|
|
Returns: |
|
The answer to the question generated from the video transcript. |
|
""" |
|
loader = YoutubeLoader.from_youtube_url( |
|
youtube_url, |
|
add_video_info=True, |
|
transcript_format=TranscriptFormat.CHUNKS, |
|
chunk_size_seconds=chunk_size_seconds, |
|
) |
|
documents = loader.load() |
|
embeddings = HuggingFaceEmbeddings(model_name='sentence-transformers/all-mpnet-base-v2') |
|
vectorstore = FAISS.from_documents(documents, embeddings) |
|
llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False) |
|
qa_chain = RetrievalQA.from_chain_type(llm=llm, retriever=vectorstore.as_retriever()) |
|
return qa_chain.run(question) |
|
|
|
|
|
|
|
|
|
|
|
python_repl = PythonREPLTool() |
|
|
|
|
|
|
|
|
|
|
|
@tool(parse_docstring=True) |
|
def wiki_search(query: str) -> str: |
|
""" |
|
Search Wikipedia for information on a given topic. |
|
|
|
Args: |
|
query: The search term for Wikipedia. |
|
|
|
Returns: |
|
A JSON string with up to 3 summary results. |
|
""" |
|
|
|
pages = WikipediaLoader(query=query, load_max_docs=3).load() |
|
results: List[Dict] = [] |
|
for doc in pages: |
|
results.append({ |
|
"source": doc.metadata["source"], |
|
"page": doc.metadata.get("page", ""), |
|
"content": doc.page_content[:1000], |
|
}) |
|
return {"wiki_results": format_search_docs(results)} |
|
|
|
|
|
|
|
|
|
|
|
def _load_image(img_path: str, resize_to=(512, 512)) -> Image.Image: |
|
""" |
|
Load, verify, convert, and resize an image. |
|
Raises ValueError on failure. |
|
""" |
|
if not img_path: |
|
raise ValueError("No image path provided.") |
|
try: |
|
with Image.open(img_path) as img: |
|
img.verify() |
|
img = Image.open(img_path).convert("RGB") |
|
img = img.resize(resize_to) |
|
return img |
|
except UnidentifiedImageError: |
|
raise ValueError(f"File at {img_path} is not a valid image.") |
|
except Exception as e: |
|
raise ValueError(f"Failed to load image at {img_path}: {e}") |
|
|
|
def _encode_image_to_base64(img_path: str) -> str: |
|
""" |
|
Load an image, save optimized PNG into memory, and base64βencode it. |
|
""" |
|
img = _load_image(img_path) |
|
buffer = BytesIO() |
|
img.save(buffer, format="PNG", optimize=True) |
|
return base64.b64encode(buffer.getvalue()).decode("utf-8") |
|
|
|
@tool |
|
def image_processing(prompt: str, img_path: str) -> str: |
|
"""Process an image using a vision LLM, with OCR fallback. |
|
|
|
Args: |
|
prompt: Instruction or question related to the image. |
|
img_path: Path to the image file. |
|
|
|
Returns: |
|
The model's response or fallback OCR result. |
|
""" |
|
try: |
|
import os |
|
|
|
if not os.path.exists(img_path): |
|
return f"Error: Image file '{img_path}' does not exist." |
|
|
|
try: |
|
b64 = _encode_image_to_base64(img_path) |
|
|
|
md = f"{prompt}\n\n" |
|
message = HumanMessage(content=md) |
|
|
|
llm = RetryingChatGroq(model="meta-llama/llama-4-maverick-17b-128e-instruct", streaming=False, temperature=0) |
|
try: |
|
resp = llm.invoke([message]) |
|
if hasattr(resp, 'content'): |
|
return resp.content.strip() |
|
elif isinstance(resp, str): |
|
return resp.strip() |
|
else: |
|
|
|
return str(resp) |
|
except Exception as invoke_err: |
|
print(f"[LLM invoke error] {invoke_err}") |
|
|
|
raise ValueError("LLM invocation failed") |
|
except Exception as llama_err: |
|
print(f"[LLM vision failed] {llama_err}") |
|
try: |
|
img = _load_image(img_path) |
|
return pytesseract.image_to_string(img).strip() |
|
except Exception as ocr_err: |
|
print(f"[OCR fallback failed] {ocr_err}") |
|
return "Unable to process the image. Please check the file and try again." |
|
except Exception as e: |
|
|
|
print(f"[image_processing error] {e}") |
|
return f"Error processing image: {str(e)}" |
|
|
|
python_repl_tool = PythonREPLTool() |
|
|
|
@tool |
|
def echo(text: str) -> str: |
|
"""Echo back the input text. |
|
|
|
Args: |
|
text: The string to be echoed. |
|
|
|
Returns: |
|
The same text that was provided as input. |
|
""" |
|
return text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from langchain_core.tools import tool |
|
from langchain.chat_models import ChatOpenAI |
|
from langgraph.prebuilt.chat_agent_executor import create_react_agent, AgentState |
|
from langchain.chat_models import init_chat_model |
|
|
|
|
|
|
|
def build_graph(provider: str = "groq"): |
|
"""Construct and compile the multiβagent GAIA workflow StateGraph. |
|
|
|
This graph wires together three Reactβstyle agents into a streamlined pipeline: |
|
PerceptionAgent β ActionAgent β EvaluationAgent (with appropriate entry/exit points) |
|
|
|
The agents have the following responsibilities: |
|
- PerceptionAgent: Handles web searches, Wikipedia, ArXiv, and image processing |
|
- ActionAgent: Performs calculations, file operations, and code analysis |
|
- EvaluationAgent: Reviews results and ensures the final answer is properly formatted |
|
|
|
Args: |
|
provider: The name of the LLM provider. Must be "groq". |
|
|
|
Returns: |
|
CompiledGraph: A compiled LangGraph state machine ready for invocation. |
|
|
|
Raises: |
|
ValueError: If `provider` is anything other than "groq". |
|
""" |
|
try: |
|
if provider != "groq": |
|
raise ValueError("Invalid provider. Expected 'groq'.") |
|
|
|
|
|
try: |
|
logger.info("Initializing LLM with model: deepseek-r1-distill-llama-70b") |
|
api_key = os.getenv("GROQ_API_KEY") |
|
if not api_key or api_key == "default_key_or_placeholder": |
|
logger.error("GROQ_API_KEY is not set or is using placeholder value") |
|
raise ValueError("GROQ_API_KEY environment variable is not set properly. Please set a valid API key.") |
|
|
|
llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", temperature=0) |
|
logger.info("LLM initialized successfully") |
|
except Exception as e: |
|
logger.error(f"Error initializing LLM: {str(e)}") |
|
raise |
|
|
|
|
|
sys_msg = SystemMessage(content=""" |
|
You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template: |
|
|
|
FINAL ANSWER: [YOUR FINAL ANSWER] |
|
|
|
YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma-separated list of numbers and/or strings. |
|
|
|
If you are asked for a number, don't use commas or units (e.g., $, %, kg) unless specified otherwise. |
|
|
|
If you are asked for a string, don't use articles (a, an, the), and don't use abbreviations (e.g., for states). |
|
|
|
If you are asked for a comma-separated list, apply the above rules to each element in the list. |
|
""".strip()) |
|
|
|
|
|
eval_sys_msg = SystemMessage(content=""" |
|
You are a specialized evaluation agent. Your job is to review the work done by other agents |
|
and provide a final, properly formatted answer. |
|
|
|
IMPORTANT: You MUST ALWAYS format your answer using this exact template: |
|
|
|
FINAL ANSWER: [concise answer] |
|
|
|
Rules for formatting the answer: |
|
1. The answer must be extremely concise - use as few words as possible |
|
2. For numeric answers, provide only the number without units unless units are specifically requested |
|
3. For text answers, avoid articles (a, an, the) and unnecessary words |
|
4. For list answers, use a comma-separated format |
|
5. NEVER explain your reasoning in the FINAL ANSWER section |
|
6. NEVER skip the "FINAL ANSWER:" prefix |
|
|
|
Example good answers: |
|
FINAL ANSWER: 42 |
|
FINAL ANSWER: Paris |
|
FINAL ANSWER: 1912, 1945, 1989 |
|
|
|
Example bad answers (don't do these): |
|
- Based on my analysis, the answer is 42. |
|
- I think it's Paris because that's the capital of France. |
|
- The years were 1912, 1945, and 1989. |
|
|
|
Remember: ALWAYS include "FINAL ANSWER:" followed by the most concise answer possible. |
|
""".strip()) |
|
|
|
|
|
logger.info("Setting up agent tools") |
|
perception_tools = [web_search, wiki_search, news_article_search, arvix_search, image_processing, echo] |
|
execution_tools = [ |
|
multiply, add, subtract, divide, modulus, |
|
download_file, process_excel_to_text, |
|
read_text_from_pdf, read_text_from_docx, |
|
transcribe_audio, youtube_audio_processing, |
|
extract_article_text, answer_youtube_video_question, |
|
python_repl_tool, analyze_code, read_code_file, analyze_python_function |
|
] |
|
|
|
|
|
logger.info("Creating agents") |
|
try: |
|
|
|
PerceptionAgent = create_react_agent( |
|
model=llm, |
|
tools=perception_tools, |
|
prompt=sys_msg, |
|
state_schema=AgentState, |
|
name="PerceptionAgent" |
|
) |
|
logger.info("Created PerceptionAgent successfully") |
|
|
|
|
|
ActionAgent = create_react_agent( |
|
model=llm, |
|
tools=execution_tools, |
|
prompt=sys_msg, |
|
state_schema=AgentState, |
|
name="ActionAgent" |
|
) |
|
logger.info("Created ActionAgent successfully") |
|
|
|
|
|
EvaluationAgent = create_react_agent( |
|
model=llm, |
|
tools=[], |
|
prompt=eval_sys_msg, |
|
state_schema=AgentState, |
|
name="EvaluationAgent" |
|
) |
|
logger.info("Created EvaluationAgent successfully") |
|
except Exception as e: |
|
logger.error(f"Error creating agent: {str(e)}") |
|
import traceback |
|
logger.error(f"Traceback: {traceback.format_exc()}") |
|
raise |
|
|
|
|
|
logger.info("Building StateGraph") |
|
try: |
|
builder = StateGraph(AgentState) |
|
|
|
|
|
builder.add_node("PerceptionAgent", PerceptionAgent) |
|
builder.add_node("ActionAgent", ActionAgent) |
|
builder.add_node("EvaluationAgent", EvaluationAgent) |
|
|
|
|
|
builder.set_entry_point("PerceptionAgent") |
|
|
|
|
|
builder.add_edge("PerceptionAgent", "ActionAgent") |
|
builder.add_edge("ActionAgent", "EvaluationAgent") |
|
|
|
|
|
builder.set_finish_point("EvaluationAgent") |
|
|
|
logger.info("Compiling StateGraph") |
|
return builder.compile() |
|
except Exception as e: |
|
logger.error(f"Error building graph: {str(e)}") |
|
import traceback |
|
logger.error(f"Traceback: {traceback.format_exc()}") |
|
raise |
|
except Exception as e: |
|
logger.error(f"Overall error in build_graph: {str(e)}") |
|
import traceback |
|
logger.error(f"Traceback: {traceback.format_exc()}") |
|
raise |
|
|
|
def get_final_answer(text): |
|
"""Extract just the FINAL ANSWER from the model's response. |
|
|
|
Args: |
|
text: The full text response from the LLM |
|
|
|
Returns: |
|
str: The extracted answer without the "FINAL ANSWER:" prefix |
|
""" |
|
|
|
logger.debug(f"Extracting answer from: {text[:200]}...") |
|
|
|
if not text: |
|
logger.warning("Empty response received") |
|
return "No answer provided." |
|
|
|
|
|
pattern = r'(?:^|\n)FINAL ANSWER:\s*(.*?)(?:\n\s*$|$)' |
|
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) |
|
if match: |
|
|
|
logger.debug("Found answer using pattern 1") |
|
return match.group(1).strip() |
|
|
|
|
|
for variant in ["FINAL ANSWER:", "FINAL_ANSWER:", "Final Answer:", "Answer:"]: |
|
lines = text.split('\n') |
|
for i, line in enumerate(reversed(lines)): |
|
if variant in line: |
|
|
|
logger.debug(f"Found answer using variant: {variant}") |
|
answer = line[line.find(variant) + len(variant):].strip() |
|
if answer: |
|
return answer |
|
|
|
if i > 0: |
|
next_line = lines[len(lines) - i] |
|
if next_line.strip(): |
|
return next_line.strip() |
|
|
|
|
|
for phrase in ["The answer is", "The result is", "We get", "Therefore,", "In conclusion,"]: |
|
phrase_pos = text.find(phrase) |
|
if phrase_pos != -1: |
|
|
|
sentence_end = text.find(".", phrase_pos) |
|
if sentence_end != -1: |
|
logger.debug(f"Found answer using phrase: {phrase}") |
|
return text[phrase_pos + len(phrase):sentence_end].strip() |
|
|
|
|
|
paragraphs = text.strip().split('\n\n') |
|
for para in reversed(paragraphs): |
|
para = para.strip() |
|
if para and not para.startswith("I ") and not para.lower().startswith("to "): |
|
logger.debug("Using last meaningful paragraph") |
|
|
|
if len(para) > 100: |
|
sentences = re.split(r'[.!?]', para) |
|
for sentence in reversed(sentences): |
|
sent = sentence.strip() |
|
if sent and len(sent) > 5 and not sent.startswith("I "): |
|
return sent |
|
return para |
|
|
|
|
|
lines = text.strip().split('\n') |
|
for line in reversed(lines): |
|
line = line.strip() |
|
if line and len(line) > 3: |
|
logger.debug("Using last line with content") |
|
return line |
|
|
|
|
|
logger.warning("Could not find a properly formatted answer") |
|
return text[:100] + "..." if len(text) > 100 else text |
|
|
|
|
|
if __name__ == "__main__": |
|
question = "When was a picture of St. Thomas Aquinas first added to the Wikipedia page on the Principle of double effect?" |
|
|
|
graph = build_graph(provider="groq") |
|
|
|
messages = [HumanMessage(content=question)] |
|
messages = graph.invoke({"messages": messages}) |
|
for m in messages["messages"]: |
|
m.pretty_print() |
|
|
|
|
|
@tool |
|
def analyze_code(code_string: str) -> str: |
|
"""Analyze a string of code to understand its structure, functionality, and potential issues. |
|
|
|
Args: |
|
code_string: The code to analyze as a string. |
|
|
|
Returns: |
|
A structured analysis of the code including functions, classes, and key operations. |
|
""" |
|
try: |
|
import ast |
|
|
|
|
|
try: |
|
parsed = ast.parse(code_string) |
|
|
|
|
|
functions = [node.name for node in ast.walk(parsed) if isinstance(node, ast.FunctionDef)] |
|
classes = [node.name for node in ast.walk(parsed) if isinstance(node, ast.ClassDef)] |
|
imports = [node.names[0].name for node in ast.walk(parsed) if isinstance(node, ast.Import)] |
|
imports.extend([f"{node.module}.{name.name}" if node.module else name.name |
|
for node in ast.walk(parsed) if isinstance(node, ast.ImportFrom) |
|
for name in node.names]) |
|
|
|
|
|
num_loops = len([node for node in ast.walk(parsed) |
|
if isinstance(node, (ast.For, ast.While))]) |
|
num_conditionals = len([node for node in ast.walk(parsed) |
|
if isinstance(node, (ast.If, ast.IfExp))]) |
|
|
|
analysis = { |
|
"language": "Python", |
|
"functions": functions, |
|
"classes": classes, |
|
"imports": imports, |
|
"complexity": { |
|
"functions": len(functions), |
|
"classes": len(classes), |
|
"loops": num_loops, |
|
"conditionals": num_conditionals |
|
} |
|
} |
|
return str(analysis) |
|
except SyntaxError: |
|
|
|
if "{" in code_string and "}" in code_string: |
|
if "function" in code_string or "=>" in code_string: |
|
language = "JavaScript/TypeScript" |
|
elif "func" in code_string or "struct" in code_string: |
|
language = "Go or Rust" |
|
elif "public" in code_string or "private" in code_string or "class" in code_string: |
|
language = "Java/C#/C++" |
|
else: |
|
language = "Unknown C-like language" |
|
elif "<" in code_string and ">" in code_string and ("/>" in code_string or "</"): |
|
language = "HTML/XML/JSX" |
|
else: |
|
language = "Unknown" |
|
|
|
return f"Non-Python code detected ({language}). Basic code structure analysis not available." |
|
except Exception as e: |
|
return f"Error analyzing code: {str(e)}" |
|
|
|
@tool |
|
def read_code_file(file_path: str) -> str: |
|
"""Read a code file and return its contents with proper syntax detection. |
|
|
|
Args: |
|
file_path: Path to the code file. |
|
|
|
Returns: |
|
The file contents and detected language. |
|
""" |
|
try: |
|
|
|
import os |
|
if not os.path.exists(file_path): |
|
return f"Error: File '{file_path}' does not exist." |
|
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
|
|
ext = os.path.splitext(file_path)[1].lower() |
|
|
|
language_map = { |
|
'.py': 'Python', |
|
'.js': 'JavaScript', |
|
'.ts': 'TypeScript', |
|
'.html': 'HTML', |
|
'.css': 'CSS', |
|
'.java': 'Java', |
|
'.c': 'C', |
|
'.cpp': 'C++', |
|
'.cs': 'C#', |
|
'.go': 'Go', |
|
'.rs': 'Rust', |
|
'.php': 'PHP', |
|
'.rb': 'Ruby', |
|
'.sh': 'Shell', |
|
'.bat': 'Batch', |
|
'.ps1': 'PowerShell', |
|
'.sql': 'SQL', |
|
'.json': 'JSON', |
|
'.xml': 'XML', |
|
'.yaml': 'YAML', |
|
'.yml': 'YAML', |
|
} |
|
|
|
language = language_map.get(ext, 'Unknown') |
|
|
|
return f"File content ({language}):\n\n{content}" |
|
except Exception as e: |
|
return f"Error reading file: {str(e)}" |
|
|
|
@tool |
|
def analyze_python_function(function_name: str, code_string: str) -> str: |
|
"""Extract and analyze a specific function from Python code. |
|
|
|
Args: |
|
function_name: The name of the function to analyze. |
|
code_string: The complete code containing the function. |
|
|
|
Returns: |
|
Analysis of the function including parameters, return type, and docstring. |
|
""" |
|
try: |
|
import ast |
|
import inspect |
|
from types import CodeType, FunctionType |
|
|
|
|
|
parsed = ast.parse(code_string) |
|
|
|
|
|
function_def = None |
|
for node in ast.walk(parsed): |
|
if isinstance(node, ast.FunctionDef) and node.name == function_name: |
|
function_def = node |
|
break |
|
|
|
if not function_def: |
|
return f"Function '{function_name}' not found in the provided code." |
|
|
|
|
|
params = [] |
|
for arg in function_def.args.args: |
|
param_name = arg.arg |
|
|
|
if arg.annotation: |
|
if isinstance(arg.annotation, ast.Name): |
|
param_type = arg.annotation.id |
|
elif isinstance(arg.annotation, ast.Attribute): |
|
param_type = f"{arg.annotation.value.id}.{arg.annotation.attr}" |
|
else: |
|
param_type = "complex_type" |
|
params.append(f"{param_name}: {param_type}") |
|
else: |
|
params.append(param_name) |
|
|
|
|
|
return_type = None |
|
if function_def.returns: |
|
if isinstance(function_def.returns, ast.Name): |
|
return_type = function_def.returns.id |
|
elif isinstance(function_def.returns, ast.Attribute): |
|
return_type = f"{function_def.returns.value.id}.{function_def.returns.attr}" |
|
else: |
|
return_type = "complex_return_type" |
|
|
|
|
|
docstring = ast.get_docstring(function_def) |
|
|
|
|
|
summary = { |
|
"function_name": function_name, |
|
"parameters": params, |
|
"return_type": return_type, |
|
"docstring": docstring, |
|
"decorators": [d.id if isinstance(d, ast.Name) else "complex_decorator" for d in function_def.decorator_list], |
|
"line_count": len(function_def.body) |
|
} |
|
|
|
|
|
result = f"Function '{function_name}' analysis:\n" |
|
result += f"- Parameters: {', '.join(params)}\n" |
|
result += f"- Return type: {return_type or 'None specified'}\n" |
|
result += f"- Docstring: {docstring or 'None'}\n" |
|
result += f"- Line count: {len(function_def.body)}" |
|
|
|
return result |
|
except Exception as e: |
|
return f"Error analyzing function: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
def news_article_search(query: str, top_k: int = 3) -> Dict[str, str]: |
|
"""Search for and retrieve news articles with robust error handling for news sites. |
|
|
|
Args: |
|
query: The news topic or keywords to search for. |
|
top_k: Maximum number of articles to retrieve. |
|
|
|
Returns: |
|
A dictionary with search results formatted as XML-like document entries. |
|
""" |
|
|
|
results = [] |
|
news_sources = [ |
|
"bbc.com", "reuters.com", "apnews.com", "nasa.gov", |
|
"space.com", "universetoday.com", "nature.com", "science.org", |
|
"scientificamerican.com", "nytimes.com", "theguardian.com" |
|
] |
|
|
|
|
|
try: |
|
with DDGS() as ddgs: |
|
search_query = f"{query} site:{' OR site:'.join(news_sources)}" |
|
for hit in ddgs.text(search_query, safesearch="On", max_results=top_k*2): |
|
url = hit.get("href") or hit.get("url", "") |
|
if not url: |
|
continue |
|
|
|
|
|
result = { |
|
"source": url, |
|
"page": "", |
|
"content": hit.get("body", "")[:250], |
|
"title": hit.get("title", "") |
|
} |
|
|
|
|
|
try: |
|
headers = { |
|
"User-Agent": random.choice([ |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36", |
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.0 Safari/605.1.15", |
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36" |
|
]), |
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
|
"Accept-Language": "en-US,en;q=0.5", |
|
"Referer": "https://www.google.com/", |
|
"DNT": "1", |
|
"Connection": "keep-alive", |
|
"Upgrade-Insecure-Requests": "1" |
|
} |
|
|
|
|
|
time.sleep(1 + random.random()) |
|
|
|
|
|
from newspaper import Article |
|
article = Article(url) |
|
article.download() |
|
article.parse() |
|
|
|
|
|
if article.text and len(article.text) > 100: |
|
|
|
paragraphs = article.text.split('\n\n') |
|
first_para = paragraphs[0] if paragraphs else "" |
|
summary = first_para[:300] |
|
if len(paragraphs) > 1: |
|
summary += "... " + paragraphs[1][:200] |
|
|
|
result["content"] = summary |
|
if article.title: |
|
result["title"] = article.title |
|
|
|
except Exception as article_err: |
|
logger.warning(f"Article extraction failed for {url}: {article_err}") |
|
|
|
try: |
|
resp = requests.get(url, timeout=12, headers=headers) |
|
resp.raise_for_status() |
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
|
|
|
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content') |
|
|
|
if main_content: |
|
content = " ".join(main_content.get_text(separator=" ", strip=True).split()[:250]) |
|
result["content"] = content |
|
except Exception as req_err: |
|
logger.warning(f"Fallback extraction failed for {url}: {req_err}") |
|
|
|
|
|
results.append(result) |
|
if len(results) >= top_k: |
|
break |
|
|
|
except Exception as e: |
|
logger.error(f"News search failed: {e}") |
|
return format_search_docs([{ |
|
"source": "Error", |
|
"page": "", |
|
"content": f"Failed to retrieve news articles for '{query}': {str(e)}" |
|
}]) |
|
|
|
if not results: |
|
|
|
logger.info(f"No news results found, falling back to web_search for {query}") |
|
return web_search(query, top_k) |
|
|
|
return format_search_docs(results[:top_k]) |
|
|
|
|
|
def chunk_document(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]: |
|
""" |
|
Split a large document into smaller chunks with overlap to maintain context across chunks. |
|
|
|
Args: |
|
text: The document text to split into chunks |
|
chunk_size: Maximum size of each chunk in characters |
|
overlap: Number of characters to overlap between chunks |
|
|
|
Returns: |
|
List of text chunks |
|
""" |
|
|
|
if len(text) <= chunk_size: |
|
return [text] |
|
|
|
chunks = [] |
|
start = 0 |
|
|
|
while start < len(text): |
|
|
|
end = min(start + chunk_size, len(text)) |
|
|
|
|
|
if end < len(text): |
|
|
|
for sentence_end in ['. ', '? ', '! ']: |
|
last_period = text[start:end].rfind(sentence_end) |
|
if last_period != -1: |
|
end = start + last_period + 2 |
|
break |
|
|
|
|
|
chunks.append(text[start:end]) |
|
|
|
|
|
start = end - overlap if end < len(text) else len(text) |
|
|
|
return chunks |
|
|
|
|
|
def process_large_document(text: str, question: str, llm=None) -> str: |
|
""" |
|
Process a large document by chunking it and using retrieval to find relevant parts. |
|
|
|
Args: |
|
text: The document text to process |
|
question: The question being asked about the document |
|
llm: Optional language model to use (defaults to agent's LLM) |
|
|
|
Returns: |
|
Summarized answer based on relevant chunks |
|
""" |
|
if not llm: |
|
llm = RetryingChatGroq(model="deepseek-r1-distill-llama-70b", streaming=False, temperature=0) |
|
|
|
|
|
chunks = chunk_document(text) |
|
|
|
|
|
if len(chunks) <= 1: |
|
return text |
|
|
|
|
|
try: |
|
from langchain_community.embeddings import HuggingFaceEmbeddings |
|
from langchain.vectorstores import FAISS |
|
from langchain.schema import Document |
|
|
|
|
|
documents = [Document(page_content=chunk, metadata={"chunk_id": i}) for i, chunk in enumerate(chunks)] |
|
|
|
|
|
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") |
|
vectorstore = FAISS.from_documents(documents, embeddings) |
|
|
|
|
|
relevant_chunks = vectorstore.similarity_search(question, k=2) |
|
|
|
|
|
relevant_text = "\n\n".join([doc.page_content for doc in relevant_chunks]) |
|
|
|
|
|
return relevant_text |
|
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
|
logger.warning(f"Retrieval failed: {e}. Falling back to first chunk.") |
|
return chunks[0] |