Spaces:
Runtime error
Runtime error
from youtube_transcript_api.formatters import TextFormatter | |
from youtube_transcript_api import YouTubeTranscriptApi | |
import requests | |
from typing import Dict, List, Optional, Any, Union | |
import os | |
from dotenv import load_dotenv | |
from langgraph.graph import START, StateGraph, MessagesState | |
from langgraph.prebuilt import tools_condition | |
from langgraph.prebuilt import ToolNode | |
# from langchain_google_genai import ChatGoogleGenerativeAI | |
# from langchain_groq import ChatGroq | |
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFaceEmbeddings | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_community.document_loaders import ArxivLoader | |
from langchain_community.vectorstores import SupabaseVectorStore | |
from langchain_core.messages import SystemMessage, HumanMessage | |
from langchain_core.tools import tool | |
from langchain.tools.retriever import create_retriever_tool | |
from langchain_community.utilities import RequestsWrapper | |
# from supabase.client import Client, create_client | |
# from langchain.tools.requests.toolkit import RequestsToolkit # Added for RequestsToolKit | |
from langchain_community.tools import RequestsPostTool, RequestsGetTool | |
load_dotenv() | |
requests_wrapper = RequestsWrapper() | |
def multiply(a: int, b: int) -> int: | |
"""Multiply two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a * b | |
def add(a: int, b: int) -> int: | |
"""Add two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a + b | |
def subtract(a: int, b: int) -> int: | |
"""Subtract two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a - b | |
def divide(a: int, b: int) -> float: | |
"""Divide two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def modulus(a: int, b: int) -> int: | |
"""Get the modulus of two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a % b | |
def wiki_search(query: str) -> str: | |
"""Search Wikipedia for a query and return maximum 5 results. | |
Args: | |
query: The search query. Be specific with search terms including full names, dates, and relevant keywords. | |
""" | |
if not query or query.strip() == "": | |
return "Error: Please provide a valid search query with specific terms." | |
try: | |
search_docs = WikipediaLoader(query=query, load_max_docs=5).load() | |
if not search_docs: | |
return f"No Wikipedia results found for '{query}'. Consider refining your search terms." | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
]) | |
return formatted_search_docs | |
except Exception as e: | |
return f"Error searching Wikipedia: {str(e)}. Please try a different query." | |
def web_search(query: str) -> str: | |
"""Search Tavily for a query and return maximum 3 results. | |
Args: | |
query: The search query.""" | |
search_docs = TavilySearchResults(max_results=3).invoke(input=query) | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
]) | |
return formatted_search_docs | |
def arxiv_search(query: str) -> str: | |
"""Search Arxiv for a query and return maximum 3 result. | |
Args: | |
query: The search query.""" | |
search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
for doc in search_docs | |
]) | |
return formatted_search_docs | |
# @tool | |
# def requests_get(url: str, params: dict = {}) -> str: | |
# """Perform an HTTP GET request using LangChain's RequestsToolKit. | |
# Args: | |
# url: The URL to send the GET request to. | |
# params: Optional dictionary of query parameters. | |
# Returns: | |
# The response content as text. | |
# """ | |
# toolkit = RequestsGetTool(requests_wrapper=requests_wrapper) | |
# # The get method is expected to return a response-like object. | |
# response = toolkit.run(url, params=params) | |
# return response.text | |
# Adding request toolkits | |
requests_get = RequestsGetTool(requests_wrapper=requests_wrapper, allow_dangerous_requests=True) | |
requests_post = RequestsPostTool(requests_wrapper=requests_wrapper, allow_dangerous_requests=True) | |
# @tool | |
# def requests_post(url: str, data: dict = {}, json: dict = {}, headers: dict = {}) -> str: | |
# """Perform an HTTP POST request using LangChain's RequestsToolKit. | |
# Args: | |
# url: The URL to send the POST request to. | |
# data: Optional dictionary of form data. | |
# json: Optional dictionary to send as JSON. | |
# headers: Optional dictionary of HTTP headers. | |
# Returns: | |
# The response content as text. | |
# """ | |
# toolkit = RequestsPostTool(requests_wrapper=requests_wrapper) | |
# response = toolkit.run(url, data=data, json=json, headers=headers) | |
# return response.text | |
def date_filter(content: str, start_year: int, end_year: int) -> str: | |
"""Filter content based on date range and extract relevant information. | |
Args: | |
content: The text content to filter | |
start_year: Starting year (inclusive) | |
end_year: Ending year (inclusive) | |
""" | |
if not content or not isinstance(content, str): | |
return "Error: No content provided for filtering." | |
try: | |
# Convert years to strings for matching | |
years = [str(year) for year in range(start_year, end_year + 1)] | |
# Split content into paragraphs | |
paragraphs = content.split("\n") | |
# Filter paragraphs containing any year in the range | |
filtered_paragraphs = [] | |
for paragraph in paragraphs: | |
if any(f" {year}" in paragraph or f"({year})" in paragraph or f"[{year}]" in paragraph for year in years): | |
filtered_paragraphs.append(paragraph) | |
if not filtered_paragraphs: | |
return f"No content found specifically mentioning years between {start_year} and {end_year}." | |
return "\n\n".join(filtered_paragraphs) | |
except Exception as e: | |
return f"Error filtering by date range: {str(e)}" | |
import re | |
def count_items(content: str, pattern: str, context_words: int = 5) -> str: | |
"""Count items matching a pattern in content and extract contextual information. | |
Args: | |
content: The text to analyze | |
pattern: The pattern to search for (e.g. "album", "publication") | |
context_words: Number of words to include for context around matches | |
""" | |
if not content or not pattern: | |
return "Error: Both content and pattern must be provided." | |
try: | |
# Find all occurrences of the pattern | |
matches = re.finditer(r'(?i)\b\w*' + re.escape(pattern) + r'\w*\b', content) | |
# Extract context around matches | |
contexts = [] | |
count = 0 | |
for match in matches: | |
count += 1 | |
start, end = match.span() | |
# Get text before and after the match | |
text_before = content[max(0, start-100):start] | |
text_after = content[end:min(len(content), end+100)] | |
# Create contextual excerpt | |
context = f"...{text_before}{match.group(0)}{text_after}..." | |
contexts.append(context) | |
if count == 0: | |
return f"No items matching '{pattern}' found in the content." | |
result = f"Found {count} occurrences of '{pattern}'. Contexts:\n\n" | |
result += "\n---\n".join(contexts[:10]) # Limit to first 10 for brevity | |
return result | |
except Exception as e: | |
return f"Error counting items: {str(e)}" | |
def translate_text(text: str, target_language: str) -> str: | |
"""Translate text to the specified language using a simple translation API. | |
Args: | |
text: Text to translate | |
target_language: Target language (e.g., "Spanish", "French", "German") | |
""" | |
if not text: | |
return "Error: No text provided for translation." | |
try: | |
# Using LibreTranslate API (open-source translation) | |
API_URL = "https://translate.argosopentech.com/translate" | |
# Map common language names to language codes | |
language_map = { | |
"english": "en", | |
"spanish": "es", | |
"french": "fr", | |
"german": "de", | |
"italian": "it", | |
"portuguese": "pt", | |
"russian": "ru", | |
"japanese": "ja", | |
"chinese": "zh", | |
"arabic": "ar", | |
"hindi": "hi", | |
"korean": "ko" | |
} | |
# Get language code | |
target_code = language_map.get(target_language.lower()) | |
if not target_code: | |
return f"Error: Unsupported language '{target_language}'. Supported languages: {', '.join(language_map.keys())}." | |
# Prepare request | |
payload = { | |
"q": text[:500], # Limit text length to avoid API issues | |
"source": "auto", | |
"target": target_code | |
} | |
response = requests.post(API_URL, json=payload) | |
if response.status_code == 200: | |
translation = response.json().get("translatedText", "") | |
return f"Original: {text[:100]}{'...' if len(text) > 100 else ''}\n\nTranslation ({target_language}): {translation}" | |
else: | |
return f"Translation API error: {response.status_code} - {response.text}" | |
except Exception as e: | |
return f"Error translating text: {str(e)}" | |
def step_by_step_reasoning(problem: str, steps: int = 3) -> str: | |
"""Break down a complex problem into steps for clearer reasoning. | |
Args: | |
problem: The problem statement or question to analyze | |
steps: Number of reasoning steps (default: 3) | |
""" | |
if not problem: | |
return "Error: No problem provided for analysis." | |
try: | |
# Structure for breaking down any problem | |
result = f"Breaking down: {problem}\n\n" | |
# Generic reasoning steps that work for many problems | |
reasoning_steps = [ | |
"Identify the key information and requirements in the problem", | |
"Determine what knowledge or method is needed to solve it", | |
"Apply relevant formulas, data, or logical steps", | |
"Verify the solution against the original requirements", | |
"Consider alternative approaches or edge cases" | |
] | |
# Use only the requested number of steps | |
steps_to_use = min(steps, len(reasoning_steps)) | |
for i in range(steps_to_use): | |
result += f"Step {i+1}: {reasoning_steps[i]}\n" | |
result += f"This step involves analyzing {problem} by " | |
if i == 0: | |
# First step focuses on understanding the problem | |
keywords = re.findall(r'\b\w{5,}\b', problem) | |
key_concepts = [word for word in keywords if len(word) > 4][:3] | |
if key_concepts: | |
result += f"identifying key concepts like {', '.join(key_concepts)}. " | |
# Identify question type | |
if "how many" in problem.lower(): | |
result += "This is a counting or quantification problem. " | |
elif "when" in problem.lower(): | |
result += "This is a timing or chronological problem. " | |
elif "where" in problem.lower(): | |
result += "This is a location or spatial problem. " | |
elif "who" in problem.lower(): | |
result += "This is a person or entity identification problem. " | |
elif "why" in problem.lower(): | |
result += "This is a causation or reasoning problem. " | |
result += "We need to extract specific details from the problem statement.\n\n" | |
elif i == 1: | |
# Second step focuses on approach | |
if "between" in problem.lower() and re.search(r'\d{4}', problem): | |
result += "using date filtering to focus on the specific time period. " | |
result += "We need to identify relevant dates and associated events/items.\n\n" | |
elif any(word in problem.lower() for word in ["album", "song", "music", "artist", "band"]): | |
result += "examining discography information and music-related details. " | |
result += "We should focus on releases, titles, and years.\n\n" | |
elif any(word in problem.lower() for word in ["calculate", "compute", "sum", "average", "total"]): | |
result += "applying mathematical operations to derive a numeric result. " | |
result += "We need to identify the values and operations required.\n\n" | |
else: | |
result += "gathering relevant factual information and organizing it logically. " | |
result += "We should separate facts from assumptions.\n\n" | |
elif i == 2: | |
# Third step focuses on solution path | |
result += "determining the specific steps to reach a solution. " | |
result += "This may involve counting items, applying formulas, or comparing data.\n\n" | |
elif i == 3: | |
# Fourth step focuses on verification | |
result += "checking our answer against the original question requirements. " | |
result += "We should verify that we've fully addressed all parts of the question.\n\n" | |
else: | |
# Fifth step focuses on alternatives | |
result += "considering other approaches or edge cases we might have missed. " | |
result += "This ensures our answer is robust and comprehensive.\n\n" | |
result += "\nThis structured approach helps organize thinking and ensures a thorough analysis." | |
return result | |
except Exception as e: | |
return f"Error performing step-by-step reasoning: {str(e)}" | |
def analyze_content(content: str, analysis_type: str) -> str: | |
"""Analyze content for specific information based on analysis type. | |
Args: | |
content: Text content to analyze | |
analysis_type: Type of analysis to perform ('dates', 'names', 'numbers', 'events') | |
""" | |
if not content: | |
return "Error: No content provided for analysis." | |
analysis_type = analysis_type.lower() | |
try: | |
if analysis_type == 'dates': | |
# Extract dates in various formats | |
date_patterns = [ | |
r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b', # DD/MM/YYYY or MM/DD/YYYY | |
r'\b\d{1,2}\s(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s\d{2,4}\b', # DD Month YYYY | |
r'\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s\d{1,2}(?:st|nd|rd|th)?,\s\d{2,4}\b', # Month DD, YYYY | |
r'\b\d{4}\b' # YYYY (years) | |
] | |
results = [] | |
for pattern in date_patterns: | |
matches = re.findall(pattern, content, re.IGNORECASE) | |
results.extend(matches) | |
return f"Found {len(results)} date references:\n\n" + "\n".join(results) | |
elif analysis_type == 'names': | |
# Basic name extraction (this is simplified, real NER would be better) | |
name_pattern = r'\b[A-Z][a-z]+\s[A-Z][a-z]+\b' | |
names = re.findall(name_pattern, content) | |
return f"Found {len(names)} potential names:\n\n" + "\n".join(names) | |
elif analysis_type == 'numbers': | |
# Extract numbers and their context | |
number_pattern = r'\b\d+(?:,\d+)*(?:\.\d+)?\b' | |
numbers = re.findall(number_pattern, content) | |
# Get context for each number | |
contexts = [] | |
for number in numbers: | |
index = content.find(number) | |
start = max(0, index - 50) | |
end = min(len(content), index + len(number) + 50) | |
context = content[start:end].replace('\n', ' ').strip() | |
contexts.append(f"{number}: \"{context}\"") | |
return f"Found {len(numbers)} numbers with context:\n\n" + "\n".join(contexts[:20]) # Limit to 20 | |
elif analysis_type == 'events': | |
# Look for event indicators | |
event_patterns = [ | |
r'\b(?:occurred|happened|took place|event|ceremony|concert|release|published|awarded|presented)\b', | |
r'\b(?:in|on|during|at)\s\d{4}\b' | |
] | |
events = [] | |
for pattern in event_patterns: | |
for match in re.finditer(pattern, content, re.IGNORECASE): | |
start = max(0, match.start() - 100) | |
end = min(len(content), match.end() + 100) | |
context = content[start:end].replace('\n', ' ').strip() | |
events.append(context) | |
return f"Found {len(events)} potential events:\n\n" + "\n\n".join(events[:15]) # Limit to 15 | |
else: | |
return f"Error: Unsupported analysis type '{analysis_type}'. Use 'dates', 'names', 'numbers', or 'events'." | |
except Exception as e: | |
return f"Error during content analysis: {str(e)}" | |
def youtube_transcript(url: str, summarize: bool = True) -> str: | |
"""Extract transcript from YouTube video and optionally summarize it. | |
Args: | |
url: YouTube video URL or video ID | |
summarize: Whether to summarize the transcript (default: True) | |
""" | |
try: | |
# Extract video ID from URL | |
video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', url) | |
if video_id_match: | |
video_id = video_id_match.group(1) | |
else: | |
# Try using the input directly as a video ID | |
if len(url) == 11: | |
video_id = url | |
else: | |
return "Error: Invalid YouTube URL or video ID. Please provide a valid YouTube URL." | |
# Get transcript | |
transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
formatter = TextFormatter() | |
formatted_transcript = formatter.format_transcript(transcript) | |
# Get video metadata | |
response = requests.get( | |
f"https://www.youtube.com/oembed?url=http://www.youtube.com/watch?v={video_id}&format=json") | |
metadata = response.json() | |
title = metadata.get("title", "Unknown title") | |
author = metadata.get("author_name", "Unknown author") | |
if summarize and formatted_transcript: | |
# For long transcripts, break into chunks | |
max_chunk_length = 4000 | |
if len(formatted_transcript) > max_chunk_length: | |
chunks = [formatted_transcript[i:i+max_chunk_length] | |
for i in range(0, len(formatted_transcript), max_chunk_length)] | |
summary = f"Video: \"{title}\" by {author}\n\nTranscript summary (extracted from {len(chunks)} segments):\n\n" | |
# Return first and last parts of transcript instead of full summary for long videos | |
summary += f"Beginning of transcript:\n{chunks[0][:500]}...\n\n" | |
summary += f"End of transcript:\n{chunks[-1][-500:]}" | |
return summary | |
else: | |
return f"Video: \"{title}\" by {author}\n\nFull transcript:\n\n{formatted_transcript}" | |
else: | |
return f"Video: \"{title}\" by {author}\n\nFull transcript:\n\n{formatted_transcript}" | |
except Exception as e: | |
return f"Error extracting YouTube transcript: {str(e)}" | |
import base64 | |
from io import BytesIO | |
from PIL import Image | |
import json | |
def analyze_image(image_url: str, analysis_type: str = "caption") -> str: | |
"""Analyze an image from a URL and provide captions, tags, or comprehensive analysis. | |
Args: | |
image_url: URL of the image to analyze | |
analysis_type: Type of analysis to perform (options: "caption", "tags", "objects", "comprehensive") | |
""" | |
if not image_url: | |
return "Error: Please provide a valid image URL." | |
analysis_type = analysis_type.lower() | |
valid_types = ["caption", "tags", "objects", "comprehensive"] | |
if analysis_type not in valid_types: | |
return f"Error: analysis_type must be one of {', '.join(valid_types)}." | |
try: | |
# Download the image | |
response = requests.get(image_url, timeout=10) | |
response.raise_for_status() | |
# Process image based on analysis type | |
if analysis_type == "caption": | |
return caption_image(response.content) | |
elif analysis_type == "tags": | |
return tag_image(response.content) | |
elif analysis_type == "objects": | |
return detect_objects(response.content) | |
elif analysis_type == "comprehensive": | |
# Perform all analyses | |
caption_result = caption_image(response.content) | |
tags_result = tag_image(response.content) | |
objects_result = detect_objects(response.content) | |
return f"IMAGE ANALYSIS SUMMARY:\n\n{caption_result}\n\n{tags_result}\n\n{objects_result}" | |
# If none of the above conditions are met, return an error string | |
return "Error: Unknown analysis type or failed to process image." | |
except requests.exceptions.RequestException as e: | |
return f"Error downloading image: {str(e)}" | |
except Exception as e: | |
return f"Error analyzing image: {str(e)}" | |
def caption_image(image_content: bytes) -> str: | |
"""Generate captions for an image using Hugging Face API.""" | |
try: | |
# Check if we have HF API key in environment | |
hf_api_key = os.getenv("HUGGINGFACE_API_TOKEN") | |
if hf_api_key: | |
# Use Hugging Face API with auth | |
api_url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-large" | |
headers = {"Authorization": f"Bearer {hf_api_key}"} | |
# Convert image to base64 | |
image_b64 = base64.b64encode(image_content).decode("utf-8") | |
payload = {"inputs": {"image": image_b64}} | |
response = requests.post(api_url, headers=headers, json=payload) | |
if response.status_code == 200: | |
result = response.json() | |
if isinstance(result, list) and len(result) > 0: | |
return f"CAPTION: {result[0]['generated_text']}" | |
else: | |
return f"CAPTION: {result['generated_text'] if 'generated_text' in result else str(result)}" | |
else: | |
# Fallback to public API | |
return caption_image_public(image_content) | |
else: | |
# No API key, use public endpoint | |
return caption_image_public(image_content) | |
except Exception as e: | |
return f"Error generating caption: {str(e)}" | |
def caption_image_public(image_content: bytes) -> str: | |
"""Caption image using a public API endpoint.""" | |
try: | |
# Convert to PIL image for processing | |
image = Image.open(BytesIO(image_content)) | |
# Resize if too large (to avoid timeouts) | |
max_size = 1024 | |
if max(image.size) > max_size: | |
ratio = max_size / max(image.size) | |
new_size = (int(image.size[0] * ratio), int(image.size[1] * ratio)) | |
image = image.resize(new_size, Image.LANCZOS) | |
# Convert back to bytes | |
buffer = BytesIO() | |
image.save(buffer, format="JPEG") | |
image_bytes = buffer.getvalue() | |
# Call public API | |
api_url = "https://api.toonify.photos/caption" # Example public API | |
files = {"image": ("image.jpg", image_bytes, "image/jpeg")} | |
response = requests.post(api_url, files=files, timeout=15) | |
if response.status_code == 200: | |
result = response.json() | |
return f"CAPTION: {result.get('caption', 'No caption generated')}" | |
else: | |
return "CAPTION: Could not generate caption (API error)" | |
except Exception as e: | |
return f"CAPTION: Image appears to be a {detect_simple_content(image_content)}" | |
def tag_image(image_content: bytes) -> str: | |
"""Generate tags for an image.""" | |
try: | |
# Check if we have HF API key in environment | |
hf_api_key = os.getenv("HUGGINGFACE_API_TOKEN") | |
if hf_api_key: | |
# Use Hugging Face API for image tagging | |
api_url = "https://api-inference.huggingface.co/models/google/vit-base-patch16-224" | |
headers = {"Authorization": f"Bearer {hf_api_key}"} | |
# Send image as binary content | |
response = requests.post(api_url, headers=headers, data=image_content) | |
if response.status_code == 200: | |
tags = response.json() | |
# Format results | |
formatted_tags = "\n".join([f"- {tag['label']} ({tag['score']:.2%})" for tag in tags[:10]]) | |
return f"TAGS:\n{formatted_tags}" | |
else: | |
# Fallback to basic detection | |
return f"TAGS:\n- {detect_simple_content(image_content)}" | |
else: | |
# No API key | |
return f"TAGS:\n- {detect_simple_content(image_content)}" | |
except Exception as e: | |
return f"Error generating tags: {str(e)}" | |
def detect_objects(image_content: bytes) -> str: | |
"""Detect objects in an image.""" | |
try: | |
# Check if we have HF API key in environment | |
hf_api_key = os.getenv("HUGGINGFACE_API_TOKEN") | |
if hf_api_key: | |
# Use Hugging Face API for object detection | |
api_url = "https://api-inference.huggingface.co/models/facebook/detr-resnet-50" | |
headers = {"Authorization": f"Bearer {hf_api_key}"} | |
# Send image as binary content | |
response = requests.post(api_url, headers=headers, data=image_content) | |
if response.status_code == 200: | |
objects = response.json() | |
# Count objects by label | |
object_counts = {} | |
for obj in objects: | |
label = obj["label"] | |
if label in object_counts: | |
object_counts[label] += 1 | |
else: | |
object_counts[label] = 1 | |
# Format results | |
formatted_objects = "\n".join([f"- {count}Γ {label}" for label, count in object_counts.items()]) | |
return f"OBJECTS DETECTED:\n{formatted_objects}" | |
else: | |
return "OBJECTS: Could not detect objects (API error)" | |
else: | |
return "OBJECTS: API key required for object detection" | |
except Exception as e: | |
return f"Error detecting objects: {str(e)}" | |
def detect_simple_content(image_content: bytes) -> str: | |
"""Simple function to detect basic image type when APIs are not available.""" | |
try: | |
image = Image.open(BytesIO(image_content)) | |
width, height = image.size | |
aspect = width / height | |
# Very simple heuristics | |
if aspect > 2: | |
return "panorama or banner image" | |
elif aspect < 0.5: | |
return "tall or portrait image" | |
elif width < 300 or height < 300: | |
return "small thumbnail or icon" | |
else: | |
return "photograph or general image" | |
except: | |
return "image (could not analyze format)" | |
import contextlib | |
from io import StringIO | |
def python_repl(code: str) -> str: | |
"""Execute Python code and return the result. | |
Args: | |
code: Python code to execute | |
""" | |
if not code or not isinstance(code, str): | |
return "Error: Please provide valid Python code as a string." | |
try: | |
# Create a secure dict of globals with limited builtins | |
restricted_globals = { | |
"__builtins__": { | |
k: __builtins__[k] for k in [ | |
'abs', 'all', 'any', 'bool', 'chr', 'dict', 'dir', 'divmod', | |
'enumerate', 'filter', 'float', 'format', 'frozenset', 'hash', | |
'hex', 'int', 'isinstance', 'len', 'list', 'map', 'max', | |
'min', 'oct', 'ord', 'pow', 'print', 'range', 'repr', | |
'round', 'set', 'slice', 'sorted', 'str', 'sum', 'tuple', 'type', 'zip' | |
] if k in __builtins__ | |
} | |
} | |
# Add common math functions | |
import math | |
for name in ['sin', 'cos', 'tan', 'asin', 'acos', 'atan', 'sqrt', | |
'log', 'log10', 'exp', 'pi', 'e', 'ceil', 'floor', 'degrees', 'radians']: | |
if hasattr(math, name): | |
restricted_globals[name] = getattr(math, name) | |
# Local namespace for variables | |
local_vars = {} | |
# Capture stdout | |
stdout_capture = StringIO() | |
# Execute the code | |
with contextlib.redirect_stdout(stdout_capture): | |
try: | |
# Try to evaluate as an expression first | |
result = eval(code, restricted_globals, local_vars) | |
stdout_content = stdout_capture.getvalue().strip() | |
if stdout_content: | |
return f"{stdout_content}\nResult: {result}" | |
return f"Result: {result}" | |
except SyntaxError: | |
# Not an expression, try executing as statements | |
exec(code, restricted_globals, local_vars) | |
stdout_content = stdout_capture.getvalue().strip() | |
if stdout_content: | |
return stdout_content | |
return "Code executed successfully with no output." | |
except Exception as e: | |
return f"Error executing code: {type(e).__name__}: {str(e)}" | |