Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import requests, tempfile, base64, json, datetime, re, subprocess, mimetypes, fitz | |
import pandas as pd | |
from langchain.tools import tool | |
from langchain_huggingface import HuggingFaceEndpoint, ChatHuggingFace | |
from langchain.agents import initialize_agent, AgentType | |
from bs4 import BeautifulSoup | |
from langchain_openai import ChatOpenAI | |
from langchain_community.utilities import ArxivAPIWrapper | |
from youtube_transcript_api import YouTubeTranscriptApi | |
import yt_dlp | |
from PIL import Image | |
from transformers import pipeline | |
## # Load environment variables from .env file | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
# Load the environment variables | |
HF_ACCESS_KEY = os.getenv('HF_ACCESS_KEY') | |
WEATHER_API_KEY = os.getenv('WEATHER_API_KEY') | |
OPENAI_KEY = os.getenv('OPENAI_KEY') | |
OPENAI_MODEL = os.getenv ('OPENAI_MODEL') | |
########## ----- DEFINING TOOLS -----########## | |
# --- TOOL 1: Web Search Tool (DuckDuckGo) --- | |
def current_events_news_search_tool(query: str) -> str: | |
""" | |
General web search tool for current events, news, or trending topics not yet on Wikipedia. | |
Returns relevant context and source URL if available. | |
""" | |
url = f"https://api.duckduckgo.com/?q={query}&format=json&no_html=1" | |
try: | |
resp = requests.get(url, timeout=30) | |
resp.raise_for_status() | |
data = resp.json() | |
# Check main answer fields | |
for key in ["AbstractText", "Answer", "Definition"]: | |
if data.get(key): | |
answer = data[key].strip() | |
break | |
else: | |
answer = None | |
# Try to extract more from RelatedTopics | |
if not answer: | |
related = data.get("RelatedTopics") | |
if related and isinstance(related, list): | |
for topic in related: | |
if isinstance(topic, dict) and topic.get("Text"): | |
answer = topic["Text"].strip() | |
# Optionally, add the URL | |
if topic.get("FirstURL"): | |
answer += f"\nSource: {topic['FirstURL']}" | |
break | |
# Try to extract from Results | |
if not answer: | |
results = data.get("Results") | |
if results and isinstance(results, list): | |
for result in results: | |
if isinstance(result, dict) and result.get("Text"): | |
answer = result["Text"].strip() | |
if result.get("FirstURL"): | |
answer += f"\nSource: {result['FirstURL']}" | |
break | |
# Fallback: return "no_answer" | |
if answer: | |
return answer | |
return "no_answer" | |
except Exception as e: | |
return f"error: {e}" | |
# when you use the @tool decorator from langchain.tools, the tool.name and tool.description are automatically extracted from your function | |
# tool.name is set to the function name (e.g., `search_tool`), and | |
# tool.description is set to the docstring of the function (the triple-quoted string right under def ...) (e.g., "Answer general knowledge or current events queries using DuckDuckGo."). | |
# --- TOOL 3: Calculator Tool --- | |
def calculator(expression: str) -> str: | |
"""Evaluate math expressions.""" | |
try: | |
allowed = "0123456789+-*/(). " | |
if not all(c in allowed for c in expression): | |
return "error" | |
result = eval(expression, {"__builtins__": None}, {}) | |
return str(result) | |
except Exception: | |
return "error" | |
# --- TOOL 6: Wikipedia Summary Tool --- | |
def wikipedia_and_generalknowledge_search(query: str) -> str: | |
""" | |
Answer questions related to general knowledge, world information, facts, sports, olympics, history, etc. from Wikipedia by scraping the text and returns text as context for LLM to use. | |
""" | |
# Step 1: Search Wikipedia for the most relevant page | |
search_url = "https://en.wikipedia.org/w/api.php" | |
params = { | |
"action": "query", | |
"list": "search", | |
"srsearch": query, | |
"format": "json" | |
} | |
try: | |
resp = requests.get(search_url, params=params, timeout=150) | |
resp.raise_for_status() | |
results = resp.json().get("query", {}).get("search", []) | |
if not results: | |
return "no_answer" | |
page_title = results[0]["title"] | |
page_url = f"https://en.wikipedia.org/wiki/{page_title.replace(' ', '_')}" | |
except Exception: | |
return "error: Could not search Wikipedia" | |
# Step 2: Fetch the Wikipedia page and extract main text | |
try: | |
page_resp = requests.get(page_url, timeout=120) | |
page_resp.raise_for_status() | |
soup = BeautifulSoup(page_resp.text, "html.parser") | |
output = f"Source: {page_url}\n" | |
# Extract main text from all paragraphs | |
paragraphs = soup.find_all("p") | |
text = " ".join(p.get_text(separator=" ", strip=True) for p in paragraphs) | |
# Limit to first 3000 characters for brevity | |
output += text[:3000] if text else "No textual content found." | |
return output | |
except Exception as e: | |
return f"error: {e}" | |
# --- TOOL 9: Image Captioning Tool --- | |
def image_caption(image_url: str) -> str: | |
"""Generate a descriptive caption for an image given its URL.""" | |
api_url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" | |
headers = {"Authorization": f"Bearer {HF_ACCESS_KEY}"} | |
payload = {"inputs": image_url} | |
try: | |
resp = requests.post(api_url, headers=headers, json=payload, timeout=120) | |
resp.raise_for_status() | |
data = resp.json() | |
return data[0]["generated_text"] if isinstance(data, list) else data.get("generated_text", "no_caption") | |
except Exception: | |
return "error" | |
# --- TOOL 10: Optical Character Recognition (OCR) Tool --- | |
def ocr_image(image_url: str) -> str: | |
""" | |
Extracts all readable text from an image using HuggingFace TrOCR (microsoft/trocr-base-stage1). | |
Input: URL to an image (e.g., PNG or JPG). | |
Output: Recognized text string. | |
""" | |
api_url = "https://api-inference.huggingface.co/models/microsoft/trocr-base-stage1" | |
headers = { | |
"Authorization": f"Bearer {HF_ACCESS_KEY}", | |
"Content-Type": "application/json" | |
} | |
payload = {"inputs": image_url} | |
try: | |
resp = requests.post(api_url, headers=headers, json=payload, timeout=60) | |
resp.raise_for_status() | |
data = resp.json() | |
return data[0]["generated_text"] | |
except Exception as e: | |
return f"OCR error: {e}" | |
# --- TOOL 11: Image Classification Tool --- | |
def clasify_describe_image(image_url: str) -> str: | |
""" | |
Generates a caption describing the contents of an image using HuggingFace (ViT-GPT2). | |
Use this tool to identify the main subject of an image so that an LLM can use it to answer further. | |
Input: image URL | |
Output: caption like 'A golden retriever lying on a couch.' | |
""" | |
api_url = "https://api-inference.huggingface.co/models/nlpconnect/vit-gpt2-image-captioning" | |
headers = {"Authorization": f"Bearer {HF_ACCESS_KEY}"} | |
try: | |
img_resp = requests.get(image_url, timeout=120) | |
img_resp.raise_for_status() | |
image_bytes = img_resp.content | |
response = requests.post(api_url, headers=headers, data=image_bytes, timeout=60) | |
response.raise_for_status() | |
result = response.json() | |
return result[0]["generated_text"] if isinstance(result, list) else "no_caption" | |
except Exception as e: | |
return f"caption error: {e}" | |
# --- TOOL 12: Web Scraping Tool --- | |
def URL_scrape_tool(url: str) -> str: | |
""" | |
Scrape the main textual content from a given website URL and returns the text - to be used as context by model. | |
""" | |
try: | |
headers = { | |
"User-Agent": "Mozilla/5.0 (compatible; WebScrapeTool/1.0)" | |
} | |
resp = requests.get(url, headers=headers, timeout=120) | |
resp.raise_for_status() | |
soup = BeautifulSoup(resp.text, "html.parser") | |
# Try to extract main content from common tags | |
paragraphs = soup.find_all("p") | |
text = " ".join(p.get_text() for p in paragraphs) | |
# Limit to first 2000 characters for brevity | |
return text[:4000] if text else "No textual content found." | |
except Exception as e: | |
return f"error: {e}" | |
# --- TOOL 13: Audio to Text Transcription Tool --- | |
def audio_url_to_text(audio_url: str) -> str: | |
""" | |
Transcribe speech from an audio file URL to text using Hugging Face's Whisper model. | |
Input: A direct link to an audio file (e.g., .mp3, .wav). | |
Output: The transcribed text. | |
""" | |
api_url = "https://api-inference.huggingface.co/models/openai/whisper-large-v3" | |
headers = {"Authorization": f"Bearer {HF_ACCESS_KEY}"} | |
try: | |
# Download the audio file | |
audio_resp = requests.get(audio_url, timeout=120) | |
audio_resp.raise_for_status() | |
audio_bytes = audio_resp.content | |
# Encode audio as base64 for API | |
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") | |
payload = { | |
"inputs": audio_b64, | |
"parameters": {"return_timestamps": False} | |
} | |
resp = requests.post(api_url, headers=headers, json=payload, timeout=120) | |
resp.raise_for_status() | |
data = resp.json() | |
return data.get("text", "no_answer") | |
except Exception as e: | |
return f"error: {e}" | |
# --- TOOL 14: Python Code Executor Tool --- | |
def python_executor(code: str) -> str: | |
""" | |
Safely execute simple Python code and return the result if the code is in the question. If the question has .py file attached, use 'python_excel_audio_video_attached_file_tool' tool first. | |
Only supports expressions and basic statements (no imports, file I/O, or system access). | |
""" | |
try: | |
# Restrict built-ins for safety | |
allowed_builtins = {"abs": abs, "min": min, "max": max, "sum": sum, "len": len, "range": range} | |
# Only allow expressions, not statements | |
result = eval(code, {"__builtins__": allowed_builtins}, {}) | |
return str(result) | |
except Exception as e: | |
return f"error: {e}" | |
# --- TOOL 15: Attachment Processing Tool --- | |
def python_excel_audio_video_attached_file_tool(input_str: str) -> str: | |
""" | |
Accepts a JSON string with one of: | |
• 'file_bytes' : base-64–encoded bytes (existing behaviour) | |
• 'file_path' : local absolute/relative path to a file | |
• 'file_url' : downloadable URL (e.g. Hugging Face dataset link) | |
Keys (at least one bytes / path / url required): | |
• filename (str) – original name with extension | |
• file_bytes (str, base-64) – optional | |
• file_path (str) – optional | |
• file_url (str) – optional | |
Returns: textual summary / preview ready for the LLM. | |
""" | |
# ---------- 1. Parse JSON ------------------------------------------------ | |
try: | |
# Robustly pull out the first {...} block even if extra tokens are around it | |
match = re.search(r'(\{.*\})', input_str, re.DOTALL) | |
payload = json.loads(match.group(1) if match else input_str) | |
except Exception as e: | |
return f"error: Could not parse JSON → {e}" | |
filename = payload.get("filename") | |
b64_data = payload.get("file_bytes") | |
file_path = payload.get("file_path") | |
file_url = payload.get("file_url") | |
if not filename: | |
return "error: 'filename' is required." | |
# ---------- 2. Acquire raw bytes ---------------------------------------- | |
try: | |
if b64_data: # inline bytes | |
file_bytes = base64.b64decode(b64_data) | |
elif file_path and os.path.exists(file_path): # local path | |
with open(file_path, "rb") as f: | |
file_bytes = f.read() | |
elif file_url: # remote URL | |
# stream to avoid loading huge files into memory at once | |
r = requests.get(file_url, timeout=60, stream=True) | |
r.raise_for_status() | |
file_bytes = r.content | |
else: | |
return "error: Provide 'file_bytes', 'file_path', or 'file_url'." | |
except Exception as e: | |
return f"error: Could not load file → {e}" | |
# Detect file type | |
mime_type, _ = mimetypes.guess_type(filename) | |
# fallback for common extensions if guess_type fails | |
if not mime_type: | |
ext = filename.lower() | |
mime_type = ( | |
"text/x-python" if ext.endswith(".py") else | |
"text/csv" if ext.endswith(".csv") else | |
"application/vnd.ms-excel" if ext.endswith((".xls", ".xlsx")) else | |
None | |
) | |
if not mime_type: | |
return "error: Could not determine file type. Skip the file." | |
# Handle audio files | |
if mime_type.startswith("audio"): | |
api_url = "https://api-inference.huggingface.co/models/openai/whisper-large-v3" | |
headers = {"Authorization": f"Bearer {HF_ACCESS_KEY}"} | |
files = {"file": (filename, file_bytes)} | |
try: | |
resp = requests.post(api_url, headers=headers, files=files, timeout=120) | |
resp.raise_for_status() | |
data = resp.json() | |
transcript = data.get("text", "") | |
if transcript: | |
return f"Transcript of the audio: {transcript}" | |
else: | |
return "error: No transcript returned." | |
except Exception as e: | |
return f"error: {e}" | |
# Handle image files | |
elif mime_type.startswith("image"): | |
# image_b64 = base64.b64encode(file_bytes).decode() | |
api_url = "https://api-inference.huggingface.co/models/nlpconnect/vit-gpt2-image-captioning" | |
headers = {"Authorization": f"Bearer {os.getenv('HF_ACCESS_KEY', '')}"} | |
try: | |
resp = requests.post(api_url, headers=headers, data=file_bytes, timeout=60) | |
resp.raise_for_status() | |
result = resp.json() | |
if isinstance(result, list) and result and "generated_text" in result[0]: | |
caption = result[0]["generated_text"] | |
else: | |
caption = "no_caption" | |
# Optionally also include base-64 so the LLM can refer to the raw image | |
b64 = base64.b64encode(file_bytes).decode() | |
return f"Image caption: {caption}\nAttached image (base64): {b64}" | |
except Exception as e: | |
return f"caption error: {e}" | |
return f"Attached image (base64): {image_b64}" | |
# Handle video files (extract audio, then transcribe) | |
elif mime_type.startswith("video"): | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=filename.split('.')[-1]) as tmp_video: | |
tmp_video.write(file_bytes) | |
tmp_video.flush() | |
video_path = tmp_video.name | |
audio_path = video_path + ".wav" | |
# import subprocess | |
subprocess.run([ | |
"ffmpeg", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", audio_path | |
], check=True) | |
with open(audio_path, "rb") as f: | |
audio_bytes = f.read() | |
api_url = "https://api-inference.huggingface.co/models/openai/whisper-large-v3" | |
headers = {"Authorization": f"Bearer {HF_ACCESS_KEY}"} | |
files = {"file": ("audio.wav", audio_bytes)} | |
resp = requests.post(api_url, headers=headers, files=files, timeout=120) | |
resp.raise_for_status() | |
data = resp.json() | |
transcript = data.get("text", "") | |
if transcript: | |
return f"Transcript of the video audio: {transcript}" | |
else: | |
return "error: No transcript returned from video audio." | |
except Exception as e: | |
return f"error: {e}" | |
# Handle Excel files (.xls, .xlsx, .csv) | |
elif mime_type in ["application/vnd.ms-excel", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "text/csv"]: | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=filename.split('.')[-1]) as tmp_excel: | |
tmp_excel.write(file_bytes) | |
tmp_excel.flush() | |
excel_path = tmp_excel.name | |
if filename.lower().endswith(".csv"): | |
df = pd.read_csv(excel_path) | |
preview = df.head(500).to_csv(index=False) | |
return f"CSV file preview (first 5 rows):\n{preview}" | |
else: | |
xl = pd.ExcelFile(excel_path) | |
sheet_names = xl.sheet_names | |
preview = "" | |
for sheet in sheet_names: | |
df = xl.parse(sheet) | |
preview += f"\nSheet: {sheet}\n{df.head(500).to_csv(index=False)}" | |
return f"Excel file sheets: {sheet_names}\nPreview (first 3 rows per sheet):{preview}" | |
except Exception as e: | |
return f"error: {e}" | |
# Handle Python files (.py) | |
elif mime_type == "text/x-python" or filename.lower().endswith(".py"): | |
try: | |
code = file_bytes.decode("utf-8", errors="replace") | |
lines = code.splitlines() | |
preview = "\n".join(lines[:40]) | |
return f"Python file preview (first 40 lines):\n{preview}" | |
except Exception as e: | |
return f"error: {e}" | |
else: | |
return "error: Unsupported file type. Please skip the file usage." | |
# --- TOOL 16: Research Paper Info Extraction Tool --- | |
def research_paper_search(query: str) -> str: | |
""" | |
Search arXiv for journals/research/technical papers matching a query. | |
Returns top results including title, authors, abstract, and PDF link. | |
""" | |
wrapper = ArxivAPIWrapper( | |
top_k_results=2, # how many papers to return | |
doc_content_chars_max=2000 # max chars of abstract to show | |
) | |
results_text = wrapper.run(query) | |
return results_text | |
# --- TOOL 17:Tool for sports, awards, competitions etc. --- | |
def sports_awards_historicalfacts_tool(query: str) -> str: | |
""" | |
For questions about sports, awards, competitions, historical facts, or generic wikipedia available data, this tool fetches relevant context from Wikipedia. | |
""" | |
# Step 1: Search Wikipedia for the most relevant page | |
search_url = "https://en.wikipedia.org/w/api.php" | |
params = { | |
"action": "query", | |
"list": "search", | |
"srsearch": query, | |
"format": "json" | |
} | |
try: | |
resp = requests.get(search_url, params=params, timeout=150) | |
resp.raise_for_status() | |
results = resp.json().get("query", {}).get("search", []) | |
if not results: | |
return "no_answer" | |
page_title = results[0]["title"] | |
page_url = f"https://en.wikipedia.org/wiki/{page_title.replace(' ', '_')}" | |
except Exception: | |
return "error: Could not search Wikipedia" | |
# Step 2: Fetch the Wikipedia page and extract tables and lists | |
try: | |
page_resp = requests.get(page_url, timeout=150) | |
page_resp.raise_for_status() | |
soup = BeautifulSoup(page_resp.text, "html.parser") | |
output = f"Source: {page_url}\n" | |
# Extract all tables with relevant columns | |
tables = soup.find_all("table", {"class": ["wikitable", "sortable"]}) | |
found_table = False | |
for table in tables: | |
table_str = str(table) | |
if any(word in table_str.lower() for word in ["winner", "name", "year", "nationality", "country"]): | |
try: | |
df = pd.read_html(table_str)[0] | |
output += "\n--- Extracted Table ---\n" | |
output += df.to_csv(index=False) | |
found_table = True | |
except Exception: | |
continue | |
# If no relevant table, extract lists (e.g., <ul> or <ol> with <li>) | |
if not found_table: | |
lists = soup.find_all(['ul', 'ol']) | |
for lst in lists: | |
items = lst.find_all('li') | |
if len(items) > 2: # Only consider lists with more than 2 items | |
output += "\n--- Extracted List ---\n" | |
for item in items: | |
text = item.get_text(separator=" ", strip=True) | |
output += f"{text}\n" | |
break # Only include the first relevant list | |
# Fallback: return the first paragraph if nothing else | |
if not found_table and "--- Extracted List ---" not in output: | |
first_p = soup.find("p") | |
output += first_p.get_text(strip=True)[:500] if first_p else "no_answer" | |
# Limit output length for LLM context | |
return output[:3500] | |
except Exception as e: | |
return f"error: {e}" | |
# --- TOOL 17: YouTube Transcript Tool --- | |
def youtube_transcript_tool(video_url: str) -> str: | |
""" | |
Get transcript (if available) for a YouTube video without downloading audio. | |
Works only if subtitles or auto-captions exist. | |
""" | |
try: | |
# Extract video ID | |
match = re.search(r"(?:v=|youtu\.be/)([a-zA-Z0-9_-]{11})", video_url) | |
if not match: | |
return "Invalid YouTube URL." | |
video_id = match.group(1) | |
transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
full_text = " ".join([chunk['text'] for chunk in transcript]) | |
return full_text[:5000] # truncate to keep LLM input manageable | |
except Exception as e: | |
return f"Transcript error: {e}" | |
# --- TOOL 18: YouTube Transcript Tool --- | |
def video_url_to_transcript_tool(media_url: str) -> str: | |
""" | |
Given a URL to a video or audio file (YouTube, direct .mp4/.mp3/.wav, etc.), download the audio and return a transcript. | |
""" | |
api_url = "https://api-inference.huggingface.co/models/openai/whisper-large-v3" | |
headers = {"Authorization": f"Bearer {HF_ACCESS_KEY}"} | |
try: | |
with tempfile.TemporaryDirectory() as tmpdir: | |
audio_path = None | |
# Check if it's a YouTube URL | |
if "youtube.com" in media_url or "youtu.be" in media_url: | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'outtmpl': f'{tmpdir}/audio.%(ext)s', | |
'quiet': True, | |
'noplaylist': True, | |
'extractaudio': True, | |
'audioformat': 'wav', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'wav', | |
'preferredquality': '192', | |
}], | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(media_url, download=True) | |
audio_path = ydl.prepare_filename(info).rsplit('.', 1)[0] + '.wav' | |
else: | |
# Download direct media file | |
resp = requests.get(media_url, timeout=120) | |
resp.raise_for_status() | |
# Guess extension | |
ext = media_url.split('?')[0].split('.')[-1].lower() | |
if ext not in ["mp3", "wav", "m4a", "mp4"]: | |
ext = "mp3" | |
file_path = os.path.join(tmpdir, f"audio.{ext}") | |
with open(file_path, "wb") as f: | |
f.write(resp.content) | |
# If video, extract audio using ffmpeg | |
if ext in ["mp4", "mkv", "webm"]: | |
audio_path = os.path.join(tmpdir, "audio.wav") | |
import subprocess | |
subprocess.run([ | |
"ffmpeg", "-i", file_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", audio_path | |
], check=True) | |
else: | |
audio_path = file_path | |
# Read audio bytes | |
with open(audio_path, "rb") as f: | |
audio_bytes = f.read() | |
# Encode audio as base64 for API | |
audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") | |
payload = { | |
"inputs": audio_b64, | |
"parameters": {"return_timestamps": False} | |
} | |
resp = requests.post(api_url, headers=headers, json=payload, timeout=120) | |
resp.raise_for_status() | |
data = resp.json() | |
return data.get("text", "no_answer") | |
except Exception as e: | |
return f"error: {e}" | |
# --- TOOL 19: Audio to Text Transcription Tool --- | |
def max_object_in_video(video_url: str, object_label: str = "bird") -> str: | |
""" | |
Given a video URL and an object label, extracts frames and uses an object detection model to count the specified object in each frame. | |
Returns the maximum number of objects detected in any single frame. | |
Example: max_object_in_video("https://...", "car") -> "Maximum car count in a frame: 4" | |
""" | |
# Download video | |
try: | |
resp = requests.get(video_url, timeout=120) | |
resp.raise_for_status() | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video: | |
tmp_video.write(resp.content) | |
tmp_video.flush() | |
video_path = tmp_video.name | |
except Exception as e: | |
return f"error: Could not download video: {e}" | |
# Extract frames every 2 seconds (adjust as needed) | |
frames_dir = tempfile.mkdtemp() | |
frame_pattern = os.path.join(frames_dir, "frame_%04d.jpg") | |
try: | |
subprocess.run([ | |
"ffmpeg", "-i", video_path, "-vf", "fps=0.5", frame_pattern | |
], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
except Exception as e: | |
return f"error: Could not extract frames: {e}" | |
# Load object detection pipeline | |
try: | |
detector = pipeline("object-detection", model="facebook/detr-resnet-50") | |
except Exception as e: | |
return f"error: Could not load detection model: {e}" | |
max_count = 0 | |
for fname in sorted(os.listdir(frames_dir)): | |
fpath = os.path.join(frames_dir, fname) | |
try: | |
image = Image.open(fpath) | |
results = detector(image) | |
count = sum(1 for obj in results if obj['label'].lower() == object_label.lower() and obj['score'] > 0.5) | |
if count > max_count: | |
max_count = count | |
except Exception: | |
continue | |
# Clean up | |
try: | |
os.remove(video_path) | |
for fname in os.listdir(frames_dir): | |
os.remove(os.path.join(frames_dir, fname)) | |
os.rmdir(frames_dir) | |
except Exception: | |
pass | |
return f"Maximum {object_label} count in a single frame: {max_count}" | |
''' | |
def extract_final_answer(output: str) -> str: | |
# Try to extract answer after [YOUR FINAL ANSWER] or Final Answer: | |
match = re.search(r"\[YOUR FINAL ANSWER\]\s*(.+)", output) | |
if match: | |
return match.group(1).strip() | |
match = re.search(r"Final Answer:\s*(.+)", output) | |
if match: | |
return match.group(1).strip() | |
# Fallback: return the whole output if no match | |
return output.strip() | |
''' | |
##-- Tool Discovery --- | |
# Use @tool for each function. | |
# Use get_all_tools() to auto-discover all decorated tools. | |
# tools_list = get_all_tools() | |
tools_list = [ | |
python_excel_audio_video_attached_file_tool, | |
wikipedia_and_generalknowledge_search, | |
# sports_awards_historicalfacts_tool, | |
research_paper_search, | |
python_executor, | |
# get_weather, | |
# calculator, | |
# convert_units, | |
# get_time, | |
# get_date, | |
# dictionary_lookup, | |
# currency_convert, | |
# image_caption, | |
# ocr_image, | |
# classify_image, | |
current_events_news_search_tool, | |
ocr_image, | |
clasify_describe_image, | |
URL_scrape_tool, | |
# audio_url_to_text, | |
# sports_awards_historicalfacts_tool, | |
youtube_transcript_tool, | |
# video_url_to_transcript_tool, | |
max_object_in_video, | |
] | |
tool_descriptions = "\n".join(f"- {tool.name}: {tool.description}" for tool in tools_list) | |
## -- | |
# --- System Prompt for the Agent --- | |
system_prompt = f""" | |
You are a general AI assistant, who can answer about general knowledge, historical facts, and also can analyze audios, images, and videos. You should think through the input question step-by-step and use tools if needed. | |
Use this reasoning format repeatedly: | |
Thought: (what you think is happening or what you want to do next) | |
Action: (the tool to use, if needed) | |
Action Input: (input to the tool) | |
Observation: (result of the tool call) | |
Repeat this process as needed. ONLY AFTER finishing your reasoning and/or tool use, provide YOUR FINAL ANSWER | |
Your output should be just a number, string, or comma-separated list. Don't give your Thoughts, Actions, Observations or any other descriptions. | |
You also have access to a set of tools, which you can use to answer the question. The available tools are: | |
{tool_descriptions} | |
If the question is related to sports, awards, historical facts or similar topic that can be answered from wikipedia, you should use the 'wikipedia_and_generalknowledge_search'. | |
If the question is about current events or news or similar current affairs category, you can utilize the tool 'current_events_news_search_tool' to fetch relevant page information and answer from it. | |
If the tool returns a long text, table, or list, extract only the most relevant information/paragraphs or data from which you can derive the answer, and return that as your final answer. | |
You must not use multiple tools in a single call. Don't hallucinate. | |
**Examples:** | |
Q: Which country had the least number of athletes at the 1928 Summer Olympics? | |
Your Output: Luxembourg | |
Q: What are the top 3 programming languages? | |
Your Output: Python, JavaScript, Java | |
If even after 12 iterations, a tool usage is not useful then try to answer directly based on your knowledge without any hallucination. If you cannot answer then just say "no_answer" as YOUR FINAL ANSWER. | |
""" | |
# If your final answer is something like 'there were 5 studio albums published between 2000 and 2009' then modify YOUR FINAL ANSWER as: '5' | |
# If your final answer is something like 'b, e' then YOUR FINAL ANSWER be: 'b, e' | |
# For each question, follow this format: | |
# Question: the input question you must answer | |
# Thought: your reasoning about what to do next | |
# Action: the action to take, must be one of the tools. If no relevant tools, answer the question directly. | |
# Action Input: the input to the action | |
# Observation: the result of the action | |
# ... (repeat Thought/Action/Action Input/Observation as needed) | |
# Final Answer: the answer to the original question, as concise as possible (number, short string, or comma-separated list, no extra explanation). | |
# system_prompt = f""" | |
# You are an intelligent assistant with access to the following tools: | |
# {tool_descriptions} | |
# For every question, you must do your internal reasoning using the Thought → Action → Observation → Answer process, but your output to the user should be ONLY the final answer as a single value (number, string, or comma-separated list), with no extra explanation, thoughts, actions, or observations. | |
# **If a tool returns a long text or description (such as from a web scraping tool), you must carefully read and process that output, and extract or identify ONLY the most relevant, concise answer to the user's question, and provide a single string as output. Do not return the full text or irrelevant details.** | |
# **Your output must be only the answer. Do not include any reasoning, tool calls, or explanations.** | |
# Examples: | |
# Q: What is 7 * (3 + 2)? | |
# Your Output: 35 | |
# Q: What’s the weather in Tokyo? | |
# Your Output: 22 | |
# Q: What is the capital of France? | |
# Your Output: Paris | |
# Q: Which year was python 3.0 released as per the website https://en.wikipedia.org/wiki/Python_(programming_language)? | |
# (Tool returns a long description about Python.) | |
# Your Output: 2008 | |
# Q: Convert 10 meters to feet. | |
# Your Output: 32.81 | |
# Instructions: | |
# - Always do your internal reasoning (Thought → Action → Observation → Answer) before producing the answer, but DO NOT show this reasoning to the user. | |
# - Use a tool only if necessary, and don't use multiple tools in a call. Don't use a tool if you can answer directly. | |
# - Your output must be a single value (number, string, or comma-separated list) with no extra explanation or formatting. | |
# - If you cannot answer the question or if you couldn't process the input question just answer as "no_answer". | |
# - Be concise and accurate. | |
# """ | |
## --- Initialize Hugging Face Model --- | |
# Generate the chat interface, including the tools | |
''' | |
llm = HuggingFaceEndpoint( | |
repo_id="meta-llama/Llama-3.3-70B-Instruct", | |
# repo_id="Qwen/Qwen2.5-32B-Instruct", | |
huggingfacehub_api_token=HF_ACCESS_KEY, | |
# model_kwargs={'prompt': system_prompt} | |
# system_prompt=system_prompt, | |
) | |
chat_llm = ChatHuggingFace(llm=llm) | |
''' | |
# Initialize the OpenAI chat model | |
chat_llm = ChatOpenAI( | |
openai_api_key=OPENAI_KEY, | |
model_name=OPENAI_MODEL, | |
temperature=0.05, | |
# max_tokens=10 | |
) | |
# Initialize the agent with the tools and system prompt | |
agent = initialize_agent( | |
tools=tools_list, | |
# llm=llm, | |
llm=chat_llm, | |
agent=AgentType.OPENAI_FUNCTIONS,#AgentType.ZERO_SHOT_REACT_DESCRIPTION, | |
agent_kwargs={"system_message": system_prompt}, | |
verbose=True, | |
max_iterations=15, # Increase as needed | |
max_execution_time=4000, # Increase as needed | |
early_stopping_method="generate", | |
handle_parsing_errors=True, | |
# return_intermediate_steps=False | |
) | |
## -- | |
def run_and_submit_all( profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
if profile: | |
username= f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
""" | |
# 1. Instantiate Agent ( modify this part to create your agent) | |
try: | |
agent = BasicAgent() | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
""" | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=120) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
try: | |
# full_prompt = f"{system_prompt}\n Input Question: {question_text}" | |
# submitted_answer = agent.run(full_prompt) | |
# submitted_answer_raw = agent.run(question_text) | |
submitted_answer = agent.run(question_text) | |
''' | |
if "YOUR FINAL ANSWER:" in submitted_answer: | |
match = re.search(r"YOUR FINAL ANSWER:\s*(.+)", submitted_answer, re.IGNORECASE | re.DOTALL) | |
scraped_answer = match.group(1).strip() | |
else: | |
scraped_answer = submitted_answer.strip() | |
''' | |
# submitted_answer = extract_final_answer(submitted_answer_raw) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=120) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Basic Agent Evaluation Runner") | |
gr.Markdown( | |
""" | |
**Instructions:** | |
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
--- | |
**Disclaimers:** | |
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). | |
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. | |
""" | |
) | |
gr.LoginButton() | |
# login_btn = gr.LoginButton() | |
# login_btn.activate() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
# Removed max_rows=10 from DataFrame constructor | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " App Starting " + "-"*30) | |
# Check for SPACE_HOST and SPACE_ID at startup for information | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: # Print repo URLs if SPACE_ID is found | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
print("-"*(60 + len(" App Starting ")) + "\n") | |
print("Launching Gradio Interface for Basic Agent Evaluation...") | |
# Launch the Gradio app | |
demo.launch(debug=True, share=True) #share=True |