import os import gradio as gr import requests, tempfile, base64, json, datetime, re, subprocess, mimetypes, fitz import pandas as pd from langchain.tools import tool from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace from langchain.agents import initialize_agent, AgentType from bs4 import BeautifulSoup from langchain_openai import ChatOpenAI from langchain_community.utilities import ArxivAPIWrapper from youtube_transcript_api import YouTubeTranscriptApi import yt_dlp from PIL import Image from transformers import pipeline ## # Load environment variables from .env file # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # Load the environment variables HF_ACCESS_KEY = os.getenv('HF_ACCESS_KEY') WEATHER_API_KEY = os.getenv('WEATHER_API_KEY') OPENAI_KEY = os.getenv('OPENAI_KEY') OPENAI_MODEL = os.getenv ('OPENAI_MODEL') ########## ----- DEFINING TOOLS -----########## # --- TOOL 1: Web Search Tool (DuckDuckGo) --- @tool def current_events_news_search_tool(query: str) -> str: """ General web search tool for current events, news, or trending topics not yet on Wikipedia. Returns relevant context and source URL if available. """ url = f"https://api.duckduckgo.com/?q={query}&format=json&no_html=1" try: resp = requests.get(url, timeout=30) resp.raise_for_status() data = resp.json() # Check main answer fields for key in ["AbstractText", "Answer", "Definition"]: if data.get(key): answer = data[key].strip() break else: answer = None # Try to extract more from RelatedTopics if not answer: related = data.get("RelatedTopics") if related and isinstance(related, list): for topic in related: if isinstance(topic, dict) and topic.get("Text"): answer = topic["Text"].strip() # Optionally, add the URL if topic.get("FirstURL"): answer += f"\nSource: {topic['FirstURL']}" break # Try to extract from Results if not answer: results = data.get("Results") if results and isinstance(results, list): for result in results: if isinstance(result, dict) and result.get("Text"): answer = result["Text"].strip() if result.get("FirstURL"): answer += f"\nSource: {result['FirstURL']}" break # Fallback: return "no_answer" if answer: return answer return "no_answer" except Exception as e: return f"error: {e}" # when you use the @tool decorator from langchain.tools, the tool.name and tool.description are automatically extracted from your function # tool.name is set to the function name (e.g., `search_tool`), and # tool.description is set to the docstring of the function (the triple-quoted string right under def ...) (e.g., "Answer general knowledge or current events queries using DuckDuckGo."). # --- TOOL 3: Calculator Tool --- @tool def calculator(expression: str) -> str: """Evaluate math expressions.""" try: allowed = "0123456789+-*/(). " if not all(c in allowed for c in expression): return "error" result = eval(expression, {"__builtins__": None}, {}) return str(result) except Exception: return "error" # --- TOOL 6: Wikipedia Summary Tool --- @tool def wikipedia_and_generalknowledge_search(query: str) -> str: """ Answer questions related to general knowledge, world information, facts, sports, olympics, history, etc. from Wikipedia by scraping the text and returns text as context for LLM to use. """ # Step 1: Search Wikipedia for the most relevant page search_url = "https://en.wikipedia.org/w/api.php" params = { "action": "query", "list": "search", "srsearch": query, "format": "json" } try: resp = requests.get(search_url, params=params, timeout=150) resp.raise_for_status() results = resp.json().get("query", {}).get("search", []) if not results: return "no_answer" page_title = results[0]["title"] page_url = f"https://en.wikipedia.org/wiki/{page_title.replace(' ', '_')}" except Exception: return "error: Could not search Wikipedia" # Step 2: Fetch the Wikipedia page and extract main text try: page_resp = requests.get(page_url, timeout=120) page_resp.raise_for_status() soup = BeautifulSoup(page_resp.text, "html.parser") output = f"Source: {page_url}\n" # Extract main text from all paragraphs paragraphs = soup.find_all("p") text = " ".join(p.get_text(separator=" ", strip=True) for p in paragraphs) # Limit to first 3000 characters for brevity output += text[:3000] if text else "No textual content found." return output except Exception as e: return f"error: {e}" # --- TOOL 9: Image Captioning Tool --- @tool def image_caption(image_url: str) -> str: """Generate a descriptive caption for an image given its URL.""" api_url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" headers = {"Authorization": f"Bearer {HF_ACCESS_KEY}"} payload = {"inputs": image_url} try: resp = requests.post(api_url, headers=headers, json=payload, timeout=120) resp.raise_for_status() data = resp.json() return data[0]["generated_text"] if isinstance(data, list) else data.get("generated_text", "no_caption") except Exception: return "error" # --- TOOL 10: Optical Character Recognition (OCR) Tool --- @tool def ocr_image(image_url: str) -> str: """ Extracts all readable text from an image using HuggingFace TrOCR (microsoft/trocr-base-stage1). Input: URL to an image (e.g., PNG or JPG). Output: Recognized text string. """ api_url = "https://api-inference.huggingface.co/models/microsoft/trocr-base-stage1" headers = { "Authorization": f"Bearer {HF_ACCESS_KEY}", "Content-Type": "application/json" } payload = {"inputs": image_url} try: resp = requests.post(api_url, headers=headers, json=payload, timeout=60) resp.raise_for_status() data = resp.json() return data[0]["generated_text"] except Exception as e: return f"OCR error: {e}" # --- TOOL 11: Image Classification Tool --- @tool def clasify_describe_image(image_url: str) -> str: """ Generates a caption describing the contents of an image using HuggingFace (ViT-GPT2). Use this tool to identify the main subject of an image so that an LLM can use it to answer further. Input: image URL Output: caption like 'A golden retriever lying on a couch.' """ api_url = "https://api-inference.huggingface.co/models/nlpconnect/vit-gpt2-image-captioning" headers = {"Authorization": f"Bearer {HF_ACCESS_KEY}"} try: img_resp = requests.get(image_url, timeout=120) img_resp.raise_for_status() image_bytes = img_resp.content response = requests.post(api_url, headers=headers, data=image_bytes, timeout=60) response.raise_for_status() result = response.json() return result[0]["generated_text"] if isinstance(result, list) else "no_caption" except Exception as e: return f"caption error: {e}" # --- TOOL 12: Web Scraping Tool --- @tool def URL_scrape_tool(url: str) -> str: """ Scrape the main textual content from a given website URL and returns the text - to be used as context by model. """ try: headers = { "User-Agent": "Mozilla/5.0 (compatible; WebScrapeTool/1.0)" } resp = requests.get(url, headers=headers, timeout=120) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") # Try to extract main content from common tags paragraphs = soup.find_all("p") text = " ".join(p.get_text() for p in paragraphs) # Limit to first 2000 characters for brevity return text[:4000] if text else "No textual content found." except Exception as e: return f"error: {e}" # --- TOOL 13: Audio to Text Transcription Tool --- @tool def audio_url_to_text(audio_url: str) -> str: """ Transcribe speech from an audio file URL to text using Hugging Face's Whisper model. Input: A direct link to an audio file (e.g., .mp3, .wav). Output: The transcribed text. """ api_url = "https://api-inference.huggingface.co/models/openai/whisper-large-v3" headers = {"Authorization": f"Bearer {HF_ACCESS_KEY}"} try: # Download the audio file audio_resp = requests.get(audio_url, timeout=120) audio_resp.raise_for_status() audio_bytes = audio_resp.content # Encode audio as base64 for API audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") payload = { "inputs": audio_b64, "parameters": {"return_timestamps": False} } resp = requests.post(api_url, headers=headers, json=payload, timeout=120) resp.raise_for_status() data = resp.json() return data.get("text", "no_answer") except Exception as e: return f"error: {e}" # --- TOOL 14: Python Code Executor Tool --- @tool def python_executor(code: str) -> str: """ Safely execute simple Python code and return the result if the code is in the question. If the question has .py file attached, use 'python_excel_audio_video_attached_file_tool' tool first. Only supports expressions and basic statements (no imports, file I/O, or system access). """ try: # Restrict built-ins for safety allowed_builtins = {"abs": abs, "min": min, "max": max, "sum": sum, "len": len, "range": range} # Only allow expressions, not statements result = eval(code, {"__builtins__": allowed_builtins}, {}) return str(result) except Exception as e: return f"error: {e}" # --- TOOL 15: Attachment Processing Tool --- @tool def python_excel_audio_video_attached_file_tool(input_str: str) -> str: """ Accepts a JSON string with one of: • 'file_bytes' : base-64–encoded bytes (existing behaviour) • 'file_path' : local absolute/relative path to a file • 'file_url' : downloadable URL (e.g. Hugging Face dataset link) Keys (at least one bytes / path / url required): • filename (str) – original name with extension • file_bytes (str, base-64) – optional • file_path (str) – optional • file_url (str) – optional Returns: textual summary / preview ready for the LLM. """ # ---------- 1. Parse JSON ------------------------------------------------ try: # Robustly pull out the first {...} block even if extra tokens are around it match = re.search(r'(\{.*\})', input_str, re.DOTALL) payload = json.loads(match.group(1) if match else input_str) except Exception as e: return f"error: Could not parse JSON → {e}" filename = payload.get("filename") b64_data = payload.get("file_bytes") file_path = payload.get("file_path") file_url = payload.get("file_url") if not filename: return "error: 'filename' is required." # ---------- 2. Acquire raw bytes ---------------------------------------- try: if b64_data: # inline bytes file_bytes = base64.b64decode(b64_data) elif file_path and os.path.exists(file_path): # local path with open(file_path, "rb") as f: file_bytes = f.read() elif file_url: # remote URL # stream to avoid loading huge files into memory at once r = requests.get(file_url, timeout=60, stream=True) r.raise_for_status() file_bytes = r.content else: return "error: Provide 'file_bytes', 'file_path', or 'file_url'." except Exception as e: return f"error: Could not load file → {e}" # Detect file type mime_type, _ = mimetypes.guess_type(filename) # fallback for common extensions if guess_type fails if not mime_type: ext = filename.lower() mime_type = ( "text/x-python" if ext.endswith(".py") else "text/csv" if ext.endswith(".csv") else "application/vnd.ms-excel" if ext.endswith((".xls", ".xlsx")) else None ) if not mime_type: return "error: Could not determine file type. Skip the file." # Handle audio files if mime_type.startswith("audio"): api_url = "https://api-inference.huggingface.co/models/openai/whisper-large-v3" headers = {"Authorization": f"Bearer {HF_ACCESS_KEY}"} files = {"file": (filename, file_bytes)} try: resp = requests.post(api_url, headers=headers, files=files, timeout=120) resp.raise_for_status() data = resp.json() transcript = data.get("text", "") if transcript: return f"Transcript of the audio: {transcript}" else: return "error: No transcript returned." except Exception as e: return f"error: {e}" # Handle image files elif mime_type.startswith("image"): # image_b64 = base64.b64encode(file_bytes).decode() api_url = "https://api-inference.huggingface.co/models/nlpconnect/vit-gpt2-image-captioning" headers = {"Authorization": f"Bearer {os.getenv('HF_ACCESS_KEY', '')}"} try: resp = requests.post(api_url, headers=headers, data=file_bytes, timeout=60) resp.raise_for_status() result = resp.json() if isinstance(result, list) and result and "generated_text" in result[0]: caption = result[0]["generated_text"] else: caption = "no_caption" # Optionally also include base-64 so the LLM can refer to the raw image b64 = base64.b64encode(file_bytes).decode() return f"Image caption: {caption}\nAttached image (base64): {b64}" except Exception as e: return f"caption error: {e}" return f"Attached image (base64): {image_b64}" # Handle video files (extract audio, then transcribe) elif mime_type.startswith("video"): try: with tempfile.NamedTemporaryFile(delete=False, suffix=filename.split('.')[-1]) as tmp_video: tmp_video.write(file_bytes) tmp_video.flush() video_path = tmp_video.name audio_path = video_path + ".wav" # import subprocess subprocess.run([ "ffmpeg", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", audio_path ], check=True) with open(audio_path, "rb") as f: audio_bytes = f.read() api_url = "https://api-inference.huggingface.co/models/openai/whisper-large-v3" headers = {"Authorization": f"Bearer {HF_ACCESS_KEY}"} files = {"file": ("audio.wav", audio_bytes)} resp = requests.post(api_url, headers=headers, files=files, timeout=120) resp.raise_for_status() data = resp.json() transcript = data.get("text", "") if transcript: return f"Transcript of the video audio: {transcript}" else: return "error: No transcript returned from video audio." except Exception as e: return f"error: {e}" # Handle Excel files (.xls, .xlsx, .csv) elif mime_type in ["application/vnd.ms-excel", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "text/csv"]: try: with tempfile.NamedTemporaryFile(delete=False, suffix=filename.split('.')[-1]) as tmp_excel: tmp_excel.write(file_bytes) tmp_excel.flush() excel_path = tmp_excel.name if filename.lower().endswith(".csv"): df = pd.read_csv(excel_path) preview = df.head(500).to_csv(index=False) return f"CSV file preview (first 5 rows):\n{preview}" else: xl = pd.ExcelFile(excel_path) sheet_names = xl.sheet_names preview = "" for sheet in sheet_names: df = xl.parse(sheet) preview += f"\nSheet: {sheet}\n{df.head(500).to_csv(index=False)}" return f"Excel file sheets: {sheet_names}\nPreview (first 3 rows per sheet):{preview}" except Exception as e: return f"error: {e}" # Handle Python files (.py) elif mime_type == "text/x-python" or filename.lower().endswith(".py"): try: code = file_bytes.decode("utf-8", errors="replace") lines = code.splitlines() preview = "\n".join(lines[:40]) return f"Python file preview (first 40 lines):\n{preview}" except Exception as e: return f"error: {e}" else: return "error: Unsupported file type. Please skip the file usage." # --- TOOL 16: Research Paper Info Extraction Tool --- @tool def research_paper_search(query: str) -> str: """ Search arXiv for journals/research/technical papers matching a query. Returns top results including title, authors, abstract, and PDF link. """ wrapper = ArxivAPIWrapper( top_k_results=2, # how many papers to return doc_content_chars_max=2000 # max chars of abstract to show ) results_text = wrapper.run(query) return results_text # --- TOOL 17:Tool for sports, awards, competitions etc. --- @tool def sports_awards_historicalfacts_tool(query: str) -> str: """ For questions about sports, awards, competitions, historical facts, or generic wikipedia available data, this tool fetches relevant context from Wikipedia. """ # Step 1: Search Wikipedia for the most relevant page search_url = "https://en.wikipedia.org/w/api.php" params = { "action": "query", "list": "search", "srsearch": query, "format": "json" } try: resp = requests.get(search_url, params=params, timeout=150) resp.raise_for_status() results = resp.json().get("query", {}).get("search", []) if not results: return "no_answer" page_title = results[0]["title"] page_url = f"https://en.wikipedia.org/wiki/{page_title.replace(' ', '_')}" except Exception: return "error: Could not search Wikipedia" # Step 2: Fetch the Wikipedia page and extract tables and lists try: page_resp = requests.get(page_url, timeout=150) page_resp.raise_for_status() soup = BeautifulSoup(page_resp.text, "html.parser") output = f"Source: {page_url}\n" # Extract all tables with relevant columns tables = soup.find_all("table", {"class": ["wikitable", "sortable"]}) found_table = False for table in tables: table_str = str(table) if any(word in table_str.lower() for word in ["winner", "name", "year", "nationality", "country"]): try: df = pd.read_html(table_str)[0] output += "\n--- Extracted Table ---\n" output += df.to_csv(index=False) found_table = True except Exception: continue # If no relevant table, extract lists (e.g.,