Spaces:
Sleeping
Sleeping
import os | |
import re | |
import json | |
from smolagents import MultiStepAgent, PythonInterpreterTool | |
from smolagents.agents import ActionOutput | |
from smolagents.utils import AgentToolExecutionError | |
from smolagents.memory import ToolCall | |
from models import ModelManager | |
from tools import search_web, scrape_website, read_file, get_youtube_transcript | |
class MonAgent(MultiStepAgent): | |
""" | |
Un agent multi-étapes avec l'implémentation COMPLÈTE de TOUTES les méthodes | |
requises et optionnelles pour garantir un fonctionnement sans erreur | |
d'implémentation manquante. | |
""" | |
def initialize_system_prompt(self) -> str: | |
""" | |
[PIÈCE 1/4 - OBLIGATOIRE] Définit la "personnalité" et les instructions | |
de base de l'agent. C'est la première chose que l'on doit implémenter. | |
""" | |
# Ce prompt est optimisé pour forcer une sortie JSON claire (enfin j'espère) | |
return """You are a world-class autonomous agent. Your goal is to fully answer the user's question by creating a plan and using tools. | |
You MUST format your response as a JSON object containing a "plan" key, which is a list of tool calls. | |
If you don't respond to a JSON object then everything will fail, it's a matter of life and death. | |
Each tool call is a dictionary with "tool" and "args". | |
Example of a valid response with a tool call: | |
{ | |
"plan": [ | |
{ | |
"tool": "search_web", | |
"args": { | |
"query": "Who is the current president of France?" | |
} | |
} | |
] | |
} | |
If you have the final answer, respond with an empty plan and the answer in the 'final_answer' key: | |
{ | |
"plan": [], | |
"final_answer": "The final answer is..." | |
} | |
**Your constraints are:** | |
1. **Analyze your plan:** Before executing, review your plan. Do not repeat actions that you have already performed. If you have already gathered the necessary information, move on to answering the question. | |
2. **Use tools effectively:** You have access to a variety of tools. Choose the best tool for each step. | |
3. **Final Answer Format:** When you have the final answer, you MUST respond with the answer and ONLY the answer. Do not include any extra text, explanations, or conversational phrases like "The answer is...". Your response should be an exact match to what is being asked. | |
**CRITICAL RULES:** | |
1. **NO LOOPS:** Before each step, review your action history. NEVER repeat a tool call with the same arguments. If a search did not yield the answer, do not repeat it. Instead, formulate a new, more specific query or use a different tool. | |
2. **THINK, THEN ACT:** After a search, analyze the results. If a source looks promising, your next step should be to SCRAPE it, not to search again. | |
3. **USE THE PYTHON TOOL:** For any task involving logic, data manipulation, sorting, strict filtering (like botanical classification), or calculations, your primary choice should be the `PythonInterpreterTool`. It is more reliable than your own reasoning for these tasks. | |
4. **FINAL ANSWER FORMAT:** Your final answer MUST be the answer and nothing else. No extra words, no "The answer is...", no explanations. Just the value. Example: for 'What is the capital of France?', the answer is 'Paris'. | |
""" | |
def _step_stream(self, memory_step): | |
""" | |
Le cœur de l'agent : décide de la prochaine action. | |
""" | |
memory_messages = self.write_memory_to_messages() | |
memory_step.model_input_messages = memory_messages | |
try: | |
chat_message = self.model.generate( | |
memory_messages, | |
tools_to_call_from=list(self.tools.values()), | |
) | |
memory_step.model_output_message = chat_message | |
memory_step.token_usage = chat_message.token_usage | |
except Exception as e: | |
raise Exception(f"Erreur lors de la génération du modèle : {e}") | |
chat_tool_calls = chat_message.tool_calls | |
if not chat_tool_calls: | |
yield ActionOutput(output=chat_message.content, is_final_answer=True) | |
return | |
# Conversion de ChatMessageToolCall en ToolCall | |
tool_calls_for_memory = [ | |
ToolCall(name=tc.function.name, arguments=tc.function.arguments, id=tc.id) | |
for tc in chat_tool_calls | |
] | |
memory_step.tool_calls = tool_calls_for_memory | |
# ---------------------------------------------------------------- | |
final_answer = None | |
is_final = False | |
for i, tool_call in enumerate(chat_tool_calls): | |
yield tool_call # On continue de yield l'objet original pour le stream | |
tool_name = tool_call.function.name | |
tool_arguments = tool_call.function.arguments | |
tool_output_value = self.execute_tool_call(tool_name, tool_arguments) | |
if tool_name == "final_answer": | |
final_answer = tool_output_value | |
is_final = True | |
observation = self.render_tool_result(tool_output_value) | |
# On met à jour l'observation dans la mémoire | |
if memory_step.observations is None: | |
memory_step.observations = "" | |
memory_step.observations += f"\nObservation de l'outil '{tool_name}':\n{observation}" | |
yield {"tool_call_id": tool_call.id, "output": observation} | |
yield ActionOutput(output=final_answer, is_final_answer=is_final) | |
def execute_tool_call(self, tool_name: str, arguments: any) -> any: | |
""" | |
Exécute un outil avec les arguments fournis, en gérant les arguments | |
sous forme de chaîne de caractères ou de dictionnaire. | |
""" | |
if tool_name not in self.tools: | |
raise AgentToolExecutionError(f"Outil inconnu '{tool_name}'.", self.logger) | |
tool = self.tools[tool_name] | |
# Gestion des arguments sous forme de chaîne | |
parsed_arguments = arguments | |
if isinstance(parsed_arguments, str): | |
try: | |
# Essayer de parser la chaîne comme du JSON | |
parsed_arguments = json.loads(parsed_arguments) | |
except json.JSONDecodeError: | |
# Si ce n'est pas du JSON, on la passe telle quelle | |
pass | |
try: | |
if isinstance(parsed_arguments, dict): | |
return tool(**parsed_arguments) | |
else: | |
# Si ce n'est pas un dictionnaire, on passe l'argument directement | |
return tool(parsed_arguments) | |
except Exception as e: | |
raise AgentToolExecutionError(f"Erreur lors de l'exécution de l'outil '{tool_name}' avec les arguments {arguments}: {type(e).__name__}: {e}", self.logger) | |
def parse_plan(self, response: str) -> list[dict]: | |
""" | |
Transforme le texte brut du modèle en une liste d'actions structurées. | |
""" | |
cleaned_response = response.strip().removeprefix("```json").removesuffix("```").strip() | |
try: | |
parsed_json = json.loads(cleaned_response) | |
return parsed_json.get("plan", []) | |
except json.JSONDecodeError: | |
print(f"⚠️ Erreur de parsing JSON dans `parse_plan`. Réponse reçue:\n{response}") | |
return [] | |
def render_tool_result(self, tool_output: any) -> str: | |
""" | |
Transforme le résultat d'un outil (qui peut être n'importe quel objet Python) en un texte simple que l'IA peut comprendre pour la suite. | |
C'était probablement la pièce manquante principale. | |
""" | |
print(f"⚙️ Formatage du résultat de l'outil: {str(tool_output)[:300]}...") | |
if isinstance(tool_output, str): | |
return tool_output | |
if isinstance(tool_output, (list, dict)): | |
try: | |
return json.dumps(tool_output, indent=2, ensure_ascii=False) | |
except TypeError: | |
return str(tool_output) | |
return str(tool_output) | |
def render_final_answer(self, final_context: dict, final_response: str) -> str: | |
""" | |
Est appelée à la toute fin pour formater la réponse finale. | |
""" | |
# Essaye de parser une réponse finale structurée en JSON | |
cleaned_response = final_response.strip().removeprefix("```json").removesuffix("```").strip() | |
try: | |
parsed_json = json.loads(cleaned_response) | |
return parsed_json.get("final_answer", final_response) | |
except json.JSONDecodeError: | |
return final_response | |
class BasicAgent: | |
""" | |
Classe de compatibilité qui utilise notre nouvel agent complet et robuste. | |
Elle gère également la pré-analyse des entrées multimodales. | |
""" | |
def __init__(self): | |
print("Initialisation du BasicAgent...") | |
try: | |
if not os.getenv("HF_TOKEN"): | |
print("⚠️ Attention: Le token Hugging Face (HF_TOKEN) n'est pas défini.") | |
self.tools_list = [ | |
search_web, | |
scrape_website, | |
read_file, | |
PythonInterpreterTool(), | |
get_youtube_transcript | |
] | |
self.agent = MonAgent( | |
model=ModelManager().get_orchestrator(), | |
tools=self.tools_list | |
) | |
print("BasicAgent initialisé avec succès") | |
except Exception as e: | |
print(f"❌ Erreur critique lors de l'initialisation: {e}") | |
self.agent = None | |
def __call__(self, question: str, metadata: dict = None) -> str: | |
""" | |
Le point d'entrée de l'agent. Gère la pré-analyse en utilisant les métadonnées fournies. | |
""" | |
if self.agent is None: | |
return "Erreur: L'agent n'a pas pu être initialisé." | |
print(f"\n{'='*40}\n🤖 NOUVELLE QUESTION: {question}\n{'='*40}") | |
if metadata: | |
print(f"🕵️♂️ Métadonnées reçues: {metadata}") | |
else: | |
print("🕵️♂️ Aucune métadonnée reçue.") | |
augmented_prompt = question | |
if metadata: | |
url_from_meta = metadata.get("url") or metadata.get("video_url") or metadata.get("image_url") | |
if url_from_meta: | |
print(f"🔗 URL trouvée dans les métadonnées : {url_from_meta}") | |
context_from_url = "" | |
# On vérifie si l'URL est une image | |
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'] | |
is_image = any(url_from_meta.lower().endswith(ext) for ext in image_extensions) | |
# logique images | |
if any(url_from_meta.lower().endswith(ext) for ext in image_extensions): | |
print("🖼️ C'est une URL d'image, analyse avec le modèle de vision...") | |
try: | |
vision_model = ModelManager().get_vision_model() | |
# On demande au modèle de vision de décrire l'image en détail | |
# pour aider l'agent principal à répondre à la question. | |
vision_response = vision_model.generate( | |
[{"role": "user", "content": [ | |
{"type": "text", "text": f"Describe this image in detail. This description will be used to answer the following question: '{question}'"}, | |
{"type": "image_url", "image_url": {"url": url_from_meta}} | |
]}] | |
) | |
context_from_url = f"Here is a detailed description of the image provided:\n{vision_response.content}" | |
print("✅ Description de l'image obtenue.") | |
except Exception as e: | |
error_msg = f"Erreur lors de l'analyse de l'image : {e}" | |
print(f"❌ {error_msg}") | |
context_from_url = error_msg | |
# logique vidéos | |
elif url_from_meta.startswith("http://googleusercontent.com/youtube.com/") or url_from_meta.startswith("http://youtube.com/"): | |
print("📹 C'est une URL YouTube, récupération de la transcription...") | |
transcript_result = get_youtube_transcript(url_from_meta) | |
if "error" in transcript_result: | |
context_from_url = f"Error getting transcript: {transcript_result['error']}" | |
else: | |
context_from_url = f"Here is the transcript of the video:\n{transcript_result['transcript']}" | |
# On construit le prompt augmenté pour l'agent | |
if context_from_url: | |
augmented_prompt = ( | |
f"CONTEXTUAL INFORMATION:\n---\n{context_from_url}\n---\n" | |
f"Based on the context above, please answer the following question:\n{question}" | |
) | |
print(f"✨ Prompt augmenté pour l'agent.") | |
try: | |
return self.agent.run(augmented_prompt) | |
except Exception as e: | |
import traceback | |
print(f"❌ Erreur irrécupérable lors du traitement par MonAgent: {e}\n{traceback.format_exc()}") | |
return f"Une erreur irrécupérable s'est produite: {e}" |