Spaces:
Sleeping
Sleeping
import os | |
import re | |
import json | |
import time | |
import requests | |
from typing import List, Dict, Any, Optional | |
from pathlib import Path | |
import tempfile | |
from io import BytesIO | |
# Imports pour les outils | |
from ddgs import DDGS | |
from bs4 import BeautifulSoup | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from webdriver_manager.chrome import ChromeDriverManager | |
from fake_useragent import UserAgent | |
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
# Imports pour la lecture de fichiers | |
import PyPDF2 | |
from docx import Document | |
import openpyxl | |
import pandas as pd | |
import markdown | |
import magic | |
from PIL import Image | |
import numpy as np | |
from smolagents import tool | |
class AdvancedWebSearchTool: | |
"""Outil de recherche web avancé avec DuckDuckGo""" | |
def __init__(self): | |
self.ddgs = DDGS() | |
self.ua = UserAgent() | |
def search_web(self, query: str, max_results: int = 10, region: str = "en-en") -> List[Dict[str, str]]: | |
""" | |
Recherche avancée sur le web avec DuckDuckGo | |
Args: | |
query: Requête de recherche | |
max_results: Nombre maximum de résultats (défaut: 10) | |
region: Région de recherche (défaut: en-en) | |
Returns: | |
Liste de dictionnaires avec title, url, snippet | |
""" | |
try: | |
print(f"🔍 Recherche: '{query}' (max: {max_results})") | |
results = [] | |
search_results = self.ddgs.text( | |
query=query, | |
region=region, | |
max_results=max_results, | |
timelimit="m" # Résultats récents | |
) | |
for result in search_results: | |
results.append({ | |
"title": result.get("title", ""), | |
"url": result.get("href", ""), | |
"snippet": result.get("body", "") | |
}) | |
print(f"✅ Trouvé {len(results)} résultats") | |
return results | |
except Exception as e: | |
print(f"❌ Erreur lors de la recherche: {e}") | |
return [{"error": str(e)}] | |
class AdvancedWebScrapingTool: | |
"""Outil de scraping web avancé avec Selenium et BeautifulSoup""" | |
def __init__(self): | |
self.ua = UserAgent() | |
self._driver = None | |
def _get_driver(self): | |
"""Initialise le driver Selenium si nécessaire""" | |
if self._driver is None: | |
options = Options() | |
options.add_argument("--headless") | |
options.add_argument("--no-sandbox") | |
options.add_argument("--disable-dev-shm-usage") | |
options.add_argument(f"--user-agent={self.ua.random}") | |
options.add_argument("--disable-gpu") | |
options.add_argument("--window-size=1920,1080") | |
try: | |
self._driver = webdriver.Chrome( | |
options=options | |
) | |
except Exception as e: | |
print(f"❌ Erreur driver Selenium: {e}") | |
return None | |
return self._driver | |
def scrape_website(self, url: str, extract_text: bool = True, extract_links: bool = False) -> Dict[str, Any]: | |
""" | |
Scrape un site web et extrait le contenu | |
Args: | |
url: URL à scraper | |
extract_text: Extraire le texte principal | |
extract_links: Extraire les liens | |
Returns: | |
Dictionnaire avec le contenu extrait | |
""" | |
try: | |
print(f"🌐 Scraping: {url}") | |
# Tentative avec requests d'abord (plus rapide) | |
headers = { | |
'User-Agent': self.ua.random, | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
'Accept-Language': 'fr-FR,fr,en-EN,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate', | |
'Connection': 'keep-alive', | |
} | |
response = requests.get(url, headers=headers, timeout=30) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
result = { | |
"url": url, | |
"title": "", | |
"text": "", | |
"links": [], | |
"status": "success" | |
} | |
# Titre | |
title_tag = soup.find('title') | |
if title_tag: | |
result["title"] = title_tag.get_text().strip() | |
# Texte principal | |
if extract_text: | |
# Supprime les scripts et styles | |
for script in soup(["script", "style", "nav", "footer", "header"]): | |
script.decompose() | |
# Extrait le texte principal | |
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile(r'content|main|article')) | |
if main_content: | |
text = main_content.get_text(separator=' ', strip=True) | |
else: | |
text = soup.get_text(separator=' ', strip=True) | |
# Nettoie le texte | |
text = re.sub(r'\s+', ' ', text) | |
result["text"] = text[:10000] # Limite à 10k caractères | |
# Liens | |
if extract_links: | |
links = [] | |
for link in soup.find_all('a', href=True): | |
href = link['href'] | |
if href.startswith('http'): | |
links.append({ | |
"url": href, | |
"text": link.get_text().strip() | |
}) | |
result["links"] = links[:50] # Limite à 50 liens | |
print(f"✅ Scraping réussi: {len(result['text'])} caractères") | |
return result | |
except requests.exceptions.RequestException as e: | |
# Tentative avec Selenium si requests échoue | |
return self._scrape_with_selenium(url, extract_text, extract_links) | |
except Exception as e: | |
print(f"❌ Erreur scraping: {e}") | |
return {"error": str(e), "url": url} | |
def _scrape_with_selenium(self, url: str, extract_text: bool, extract_links: bool) -> Dict[str, Any]: | |
"""Scraping avec Selenium en fallback""" | |
try: | |
print("🤖 Tentative avec Selenium...") | |
driver = self._get_driver() | |
if not driver: | |
return {"error": "Impossible d'initialiser le driver", "url": url} | |
driver.get(url) | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) | |
result = { | |
"url": url, | |
"title": driver.title, | |
"text": "", | |
"links": [], | |
"status": "success_selenium" | |
} | |
if extract_text: | |
# Supprime les éléments indésirables | |
driver.execute_script(""" | |
var elements = document.querySelectorAll('script, style, nav, footer, header'); | |
for (var i = 0; i < elements.length; i++) { | |
elements[i].remove(); | |
} | |
""") | |
text = driver.find_element(By.TAG_NAME, "body").text | |
result["text"] = text[:10000] | |
if extract_links: | |
links = [] | |
for link in driver.find_elements(By.TAG_NAME, "a"): | |
href = link.get_attribute("href") | |
if href and href.startswith("http"): | |
links.append({ | |
"url": href, | |
"text": link.text.strip() | |
}) | |
result["links"] = links[:50] | |
return result | |
except Exception as e: | |
print(f"❌ Erreur Selenium: {e}") | |
return {"error": str(e), "url": url} | |
def __del__(self): | |
"""Nettoie le driver à la destruction""" | |
if self._driver: | |
try: | |
self._driver.quit() | |
except: | |
pass | |
class FileReaderTool: | |
"""Outil de lecture de fichiers multi-format""" | |
def read_file(self, file_path: str) -> Dict[str, Any]: | |
""" | |
Lit un fichier et extrait son contenu selon le format | |
Args: | |
file_path: Chemin vers le fichier | |
Returns: | |
Dictionnaire avec le contenu du fichier | |
""" | |
try: | |
print(f"📄 Lecture du fichier: {file_path}") | |
if not os.path.exists(file_path): | |
return {"error": f"Fichier non trouvé: {file_path}"} | |
# Détection du type de fichier | |
file_ext = Path(file_path).suffix.lower() | |
result = { | |
"file_path": file_path, | |
"file_type": file_ext, | |
"content": "", | |
"metadata": {}, | |
"status": "success" | |
} | |
if file_ext == '.pdf': | |
result.update(self._read_pdf(file_path)) | |
# Word Documents | |
elif file_ext in ['.docx', '.doc']: | |
result.update(self._read_docx(file_path)) | |
# Excel | |
elif file_ext in ['.xlsx', '.xls']: | |
result.update(self._read_excel(file_path)) | |
# CSV | |
elif file_ext == '.csv': | |
result.update(self._read_csv(file_path)) | |
# JSON | |
elif file_ext == '.json': | |
result.update(self._read_json(file_path)) | |
# Markdown | |
elif file_ext in ['.md', '.markdown']: | |
result.update(self._read_markdown(file_path)) | |
# Texte simple | |
elif file_ext in ['.txt', '.log', '.py', '.js', '.html', '.css']: | |
result.update(self._read_text(file_path)) | |
# Images | |
elif file_ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: | |
result.update(self._read_image(file_path)) | |
else: | |
# Tentative de lecture comme texte | |
result.update(self._read_text(file_path)) | |
print(f"✅ Fichier lu avec succès: {len(str(result['content']))} caractères") | |
return result | |
except Exception as e: | |
print(f"❌ Erreur lecture fichier: {e}") | |
return {"error": str(e), "file_path": file_path} | |
def _read_pdf(self, file_path: str) -> Dict[str, Any]: | |
"""Lit un fichier PDF""" | |
try: | |
with open(file_path, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() + "\n" | |
return { | |
"content": text.strip(), | |
"metadata": { | |
"pages": len(pdf_reader.pages), | |
"title": pdf_reader.metadata.get('/Title', '') if pdf_reader.metadata else '' | |
} | |
} | |
except Exception as e: | |
return {"error": f"Erreur PDF: {e}"} | |
def _read_docx(self, file_path: str) -> Dict[str, Any]: | |
"""Lit un fichier Word""" | |
try: | |
doc = Document(file_path) | |
text = "\n".join([paragraph.text for paragraph in doc.paragraphs]) | |
return { | |
"content": text, | |
"metadata": { | |
"paragraphs": len(doc.paragraphs), | |
"core_properties": str(doc.core_properties.title) if doc.core_properties.title else "" | |
} | |
} | |
except Exception as e: | |
return {"error": f"Erreur DOCX: {e}"} | |
def _read_excel(self, file_path: str) -> Dict[str, Any]: | |
"""Lit un fichier Excel""" | |
try: | |
df = pd.read_excel(file_path, sheet_name=None) | |
content = {} | |
for sheet_name, sheet_df in df.items(): | |
content[sheet_name] = { | |
"data": sheet_df.to_dict('records')[:100], # Limite à 100 lignes | |
"shape": sheet_df.shape, | |
"columns": list(sheet_df.columns) | |
} | |
return { | |
"content": content, | |
"metadata": { | |
"sheets": list(df.keys()), | |
"total_sheets": len(df) | |
} | |
} | |
except Exception as e: | |
return {"error": f"Erreur Excel: {e}"} | |
def _read_csv(self, file_path: str) -> Dict[str, Any]: | |
"""Lit un fichier CSV""" | |
try: | |
df = pd.read_csv(file_path) | |
return { | |
"content": { | |
"data": df.head(100).to_dict('records'), # Premières 100 lignes | |
"shape": df.shape, | |
"columns": list(df.columns), | |
"dtypes": df.dtypes.to_dict() | |
}, | |
"metadata": { | |
"rows": len(df), | |
"columns": len(df.columns) | |
} | |
} | |
except Exception as e: | |
return {"error": f"Erreur CSV: {e}"} | |
def _read_json(self, file_path: str) -> Dict[str, Any]: | |
"""Lit un fichier JSON""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as file: | |
data = json.load(file) | |
return { | |
"content": data, | |
"metadata": { | |
"type": type(data).__name__, | |
"size": len(str(data)) | |
} | |
} | |
except Exception as e: | |
return {"error": f"Erreur JSON: {e}"} | |
def _read_markdown(self, file_path: str) -> Dict[str, Any]: | |
"""Lit un fichier Markdown""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as file: | |
content = file.read() | |
html = markdown.markdown(content) | |
return { | |
"content": content, | |
"metadata": { | |
"html_version": html, | |
"lines": len(content.split('\n')) | |
} | |
} | |
except Exception as e: | |
return {"error": f"Erreur Markdown: {e}"} | |
def _read_text(self, file_path: str) -> Dict[str, Any]: | |
"""Lit un fichier texte""" | |
try: | |
# Détection de l'encodage | |
encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] | |
for encoding in encodings: | |
try: | |
with open(file_path, 'r', encoding=encoding) as file: | |
content = file.read() | |
return { | |
"content": content, | |
"metadata": { | |
"encoding": encoding, | |
"lines": len(content.split('\n')), | |
"characters": len(content) | |
} | |
} | |
except UnicodeDecodeError: | |
continue | |
return {"error": "Impossible de décoder le fichier"} | |
except Exception as e: | |
return {"error": f"Erreur texte: {e}"} | |
def _read_image(self, file_path: str) -> Dict[str, Any]: | |
"""Lit une image et extrait les métadonnées""" | |
try: | |
with Image.open(file_path) as img: | |
return { | |
"content": f"Image {img.format} - Taille: {img.size[0]}x{img.size[1]}", | |
"metadata": { | |
"format": img.format, | |
"size": img.size, | |
"mode": img.mode, | |
"path": file_path | |
} | |
} | |
except Exception as e: | |
return {"error": f"Erreur image: {e}"} | |
# Initialisation et Export des Outils | |
# On instancie les classes qui contiennent la logique des outils | |
_web_search_tool_instance = AdvancedWebSearchTool() | |
_web_scraping_tool_instance = AdvancedWebScrapingTool() | |
_file_reader_tool_instance = FileReaderTool() | |
# On définit des fonctions "wrapper" autonomes et on les décore avec @tool | |
def search_web(query: str, max_results: int = 10, region: str = "fr-fr") -> List[Dict[str, str]]: | |
""" | |
Recherche avancée sur le web avec DuckDuckGo pour trouver des informations à jour. | |
Args: | |
query: Requête de recherche. | |
max_results: Nombre maximum de résultats à retourner. | |
region: Région de recherche (ex: 'fr-fr', 'wt-wt'). | |
Returns: | |
Liste de résultats de recherche, chacun contenant un titre, une URL et un snippet. | |
""" | |
return _web_search_tool_instance.search_web(query, max_results=max_results, region=region) | |
def scrape_website(url: str, extract_text: bool = True, extract_links: bool = False) -> Dict[str, Any]: | |
""" | |
Scrape une page web pour en extraire le contenu textuel principal, le titre et les liens. | |
Args: | |
url: URL de la page web à scraper. | |
extract_text: Si True, extrait le texte principal de la page. | |
extract_links: Si True, extrait les liens hypertextes de la page. | |
Returns: | |
Un dictionnaire contenant le contenu extrait du site. | |
""" | |
return _web_scraping_tool_instance.scrape_website(url, extract_text=extract_text, extract_links=extract_links) | |
def read_file(file_path: str) -> Dict[str, Any]: | |
""" | |
Lit un fichier local et en extrait le contenu. Gère divers formats (PDF, DOCX, XLSX, CSV, JSON, TXT, etc.). | |
Args: | |
file_path: Chemin d'accès local au fichier à lire. | |
Returns: | |
Un dictionnaire contenant le contenu du fichier et ses métadonnées. | |
""" | |
return _file_reader_tool_instance.read_file(file_path) | |
def get_youtube_transcript(video_url: str) -> Dict[str, Any]: | |
""" | |
Récupère la transcription d'une vidéo YouTube à partir de son URL standard. | |
Args: | |
video_url (str): L'URL complète et standard de la vidéo YouTube dont la transcription est nécessaire. | |
Returns: | |
Un dictionnaire contenant la transcription complète sous la clé 'transcript' en cas de succès, | |
ou un message d'erreur sous la clé 'error' en cas d'échec. | |
""" | |
try: | |
video_id = None | |
# Cherche l'ID après "v=" | |
if "v=" in video_url: | |
video_id = video_url.split("v=")[1].split('&')[0] | |
# Cherche l'ID dans les URLs du benchmark | |
elif video_url.startswith("http://googleusercontent.com/youtube.com/"): | |
video_id = video_url.split('/')[-1] | |
if not video_id: | |
return {"error": f"Impossible d'extraire l'ID de la vidéo depuis l'URL : {video_url}"} | |
print(f"📖 Récupération de la transcription pour la vidéo YouTube : {video_id}") | |
# On demande la transcription en anglais (le plus courant) et en français. | |
transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=['en', 'fr']) | |
transcript_text = " ".join([item['text'] for item in transcript_list]) | |
return {"status": "success", "transcript": transcript_text[:15000]} | |
except (TranscriptsDisabled, NoTranscriptFound) as e: | |
error_msg = f"Impossible de récupérer la transcription pour {video_id}: {type(e).__name__}" | |
print(f"✅ {error_msg}") | |
return {"status": "error", "error": error_msg} | |
except Exception as e: | |
error_msg = f"Erreur inattendue pour {video_id}: {e}" | |
print(f"❌ {error_msg}") | |
return {"error": error_msg} |