Spaces:
Sleeping
Sleeping
import concurrent | |
import os | |
import time | |
from concurrent import futures | |
from pathlib import Path | |
from typing import Any, List, Dict, Tuple | |
import pandas as pd | |
import requests | |
from dotenv import dotenv_values, load_dotenv | |
from openai import AzureOpenAI, RateLimitError | |
from smolagents import tool | |
from tqdm.auto import tqdm | |
from smolagents import GoogleSearchTool | |
import requests | |
import urllib.request | |
from markdownify import markdownify as md | |
from bs4 import BeautifulSoup | |
import json | |
test_api_base = "https://agents-course-unit4-scoring.hf.space" | |
# Configuration | |
load_dotenv() | |
client = AzureOpenAI( | |
api_key=os.getenv("AZURE_OPENAI_API_KEY"), | |
azure_endpoint=os.getenv("AZURE_OPENAI_API_BASE"), | |
api_version=os.getenv("AZURE_OPENAI_API_VERSION") | |
) | |
openai_chatmodel = os.getenv("AZURE_OPENAI_CHAT_MODEL") | |
GRAY = "\033[90m" | |
BOLD = "\033[1m" | |
RESET = "\033[0m" | |
# Load questions | |
response = requests.get(f"{test_api_base}/questions", timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
df = pd.DataFrame(questions_data) | |
# Define tools & agent | |
def read_file(file_path_str: str) -> str: | |
""" | |
A tool that reads the contents of a file and returns them as text. | |
Args: | |
file_path_str: The path to the file that should be read. | |
""" | |
file_path = Path(file_path_str) | |
file_path = file_path.resolve() | |
if not file_path.exists() or not file_path.is_file(): | |
raise ValueError(f"File {file_path} does not exist or is not a file.") | |
switcher = { | |
".txt": lambda: file_path.read_text(encoding="utf-8"), | |
".csv": lambda: file_path.read_text(encoding="utf-8"), | |
".py": lambda: file_path.read_text(encoding="utf-8"), | |
".xlsx": lambda: pd.read_excel(file_path).to_string(), | |
} | |
return switcher.get(file_path.suffix, lambda: "Unsupported file type")() | |
def get_search_results_for(query): | |
encoded_query = urllib.parse.urlencode({'q': query}) | |
url = f'https://html.duckduckgo.com/html?q={encoded_query}' | |
request = urllib.request.Request(url) | |
request.add_header('User-Agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36') | |
raw_response = urllib.request.urlopen(request).read() | |
html = raw_response.decode("utf-8") | |
soup = BeautifulSoup(html, 'html.parser') | |
a_results = soup.select("a.result__a") | |
links = [] | |
for a_result in a_results: | |
# print(a_result) | |
url = a_result.attrs['href'] | |
title = a_result.text | |
links.append({"title": title, "url": url} ) | |
return links | |
search_tool = GoogleSearchTool("serper") | |
def get_google_search_results_for(query: str): | |
return search_tool.forward(query) | |
def load_page_content(url) -> str: | |
response = requests.get(url, headers={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36'}) | |
page_content = response.content.decode('utf-8') | |
page_content_md = md(page_content) | |
return page_content_md | |
tools = [{ | |
"type": "function", | |
"function": { | |
"name": "get_search_results_for", | |
"description": "Returns the top 10 results for a DuckDuckGo query.", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"query": { | |
"type": "string", | |
"description": "query to search for on DuckDuckGo" | |
} | |
}, | |
"required": [ | |
"query" | |
], | |
"additionalProperties": False | |
}, | |
"strict": True | |
} | |
}, | |
{ | |
"type": "function", | |
"function": { | |
"name": "load_page_content", | |
"description": "Returns the content of a particular webpage.", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"url": { | |
"type": "string", | |
"description": "Url of the webpage for which to retrieve the content" | |
} | |
}, | |
"required": [ | |
"url" | |
], | |
"additionalProperties": False | |
}, | |
"strict": True | |
} | |
} | |
] | |
def call_function(name, args): | |
if name == "get_search_results_for": | |
return get_google_search_results_for(**args) | |
if name == "load_page_content": | |
return load_page_content(**args) | |
return None | |
def run_agent(task: str): | |
messages = [ | |
{ | |
"role": "system", | |
"content": "You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string." | |
}, | |
{"role": "user", "content": task} | |
] | |
while True: | |
for i in range(10): | |
try: | |
completion = client.chat.completions.create( | |
model=openai_chatmodel, | |
messages=messages, | |
tools=tools | |
) | |
break | |
except RateLimitError: | |
print(f"{GRAY}Rate limit exceeded, waiting for 10 seconds...{RESET}") | |
time.sleep(i*10) | |
continue | |
if completion.choices[0].finish_reason == "stop": | |
print(f"{BOLD}Final answer: {completion.choices[0].message.content}{RESET}") | |
return completion.choices[0].message.content.split("FINAL ANSWER:")[-1].strip() | |
elif completion.choices[0].finish_reason == "tool_calls": | |
messages.append(completion.choices[0].message) | |
for tool_call in completion.choices[0].message.tool_calls: | |
name = tool_call.function.name | |
args = json.loads(tool_call.function.arguments) | |
try: | |
result = call_function(name, args) | |
except Exception as e: | |
result = "Error calling function: " + str(e) | |
print(f"Called {BOLD}{name}({args}){RESET} and it returned {GRAY}{str(result)[:300]}{RESET}") | |
messages.append({ | |
"role": "tool", | |
"tool_call_id": tool_call.id, | |
"content": str(result) | |
}) | |
else: | |
raise Exception("We're not supposed to be here") | |
def process_question(question_data: dict[str, Any]) -> dict[str, str]: | |
task_id = question_data.get("task_id") | |
question_text = question_data.get("question") | |
# file_path = None | |
# if question_data.get("file_name"): | |
# task_id = question_data["task_id"] | |
# file_url = f"{test_api_base}/files/{task_id}" | |
# | |
# download_dir = Path("downloaded_files") | |
# download_dir.mkdir(exist_ok=True) | |
# | |
# file_response = requests.get(file_url, timeout=30) | |
# file_response.raise_for_status() | |
# | |
# file_path = download_dir / question_data.get("file_name") | |
# | |
# with open(file_path, 'wb') as f: | |
# f.write(file_response.content) | |
answer = run_agent(question_text) | |
# if file_path and file_path.suffix in ['.png', '.jpg', '.jpeg']: # I know, it's inconsistent | |
# answer = agent.run(task=adjusted_question_text, images=[Image.open(file_path)]) | |
# else: | |
# answer = agent.run(task=f"{adjusted_question_text}{f' File: |{file_path}|' if question_data.get('file_name') else ''}", ) | |
# print(f"Task ID: {task_id}, Question: {question_text}, Answer: {answer}") | |
return { | |
"task_id": task_id, | |
"submitted_answer": answer, | |
"question": question_text | |
} | |
def run_agents_parallel(questions_data: List[Dict[str, Any]], max_workers: int = 4) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: | |
start = time.time() | |
answers = [] | |
results_log = [] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_question = {executor.submit(process_question, q): q for q in questions_data} | |
for future in tqdm(concurrent.futures.as_completed(future_to_question)): | |
try: | |
answer = future.result() | |
results_log.append(answer) | |
answers.append(answer) | |
except Exception as e: | |
print(f"Question processing failed: {e}") | |
submission_data = { | |
"username": "vladi", | |
"agent_code": "https://huggingface.co/spaces/vladi/AgentsGAIAFun", | |
"answers": answers | |
} | |
end = time.time() | |
print(f"Processing time (parallel): {end - start:.2f} seconds") | |
return submission_data, results_log | |
def run_agents(questions_data: list[{}]): | |
start = time.time() | |
answers = [] | |
results_log = [] | |
for question_data in tqdm(questions_data): | |
answer = process_question(question_data) | |
results_log.append(answer) | |
answers.append(answer) | |
submission_data = { | |
"username": "vladi", | |
"agent_code": "https://huggingface.co/spaces/vladi/AgentsGAIAFun", | |
"answers": answers | |
} | |
end = time.time() | |
print(f"Processing time (sequential): {end - start:.2f} seconds") | |
return submission_data, results_log | |
def submit_answers(submission_data: dict): | |
print(f"Submitting {len(submission_data['answers'])} answers") | |
response = requests.post(f"{test_api_base}/submit", json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
return result_data | |
submission_data, results_log = run_agents(questions_data)#[:20]) | |
# submission_data, results_log = run_agents_parallel(questions_data) | |
results_df = pd.DataFrame(results_log) | |
# Last but not least... | |
submit_answers(submission_data) |