|  | import base64 | 
					
						
						|  | import json | 
					
						
						|  | import mimetypes | 
					
						
						|  | import os | 
					
						
						|  | import uuid | 
					
						
						|  | from io import BytesIO | 
					
						
						|  |  | 
					
						
						|  | import PIL.Image | 
					
						
						|  | import requests | 
					
						
						|  | from dotenv import load_dotenv | 
					
						
						|  | from huggingface_hub import InferenceClient | 
					
						
						|  |  | 
					
						
						|  | from smolagents import Tool, tool | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | load_dotenv(override=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def process_images_and_text(image_path, query, client): | 
					
						
						|  | from transformers import AutoProcessor | 
					
						
						|  |  | 
					
						
						|  | messages = [ | 
					
						
						|  | { | 
					
						
						|  | "role": "user", | 
					
						
						|  | "content": [ | 
					
						
						|  | {"type": "image"}, | 
					
						
						|  | {"type": "text", "text": query}, | 
					
						
						|  | ], | 
					
						
						|  | }, | 
					
						
						|  | ] | 
					
						
						|  | idefics_processor = AutoProcessor.from_pretrained("HuggingFaceM4/idefics2-8b-chatty") | 
					
						
						|  | prompt_with_template = idefics_processor.apply_chat_template(messages, add_generation_prompt=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def encode_local_image(image_path): | 
					
						
						|  |  | 
					
						
						|  | image = PIL.Image.open(image_path).convert("RGB") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | buffer = BytesIO() | 
					
						
						|  | image.save(buffer, format="JPEG") | 
					
						
						|  | base64_image = base64.b64encode(buffer.getvalue()).decode("utf-8") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | image_string = f"data:image/jpeg;base64,{base64_image}" | 
					
						
						|  |  | 
					
						
						|  | return image_string | 
					
						
						|  |  | 
					
						
						|  | image_string = encode_local_image(image_path) | 
					
						
						|  | prompt_with_images = prompt_with_template.replace("<image>", " ").format(image_string) | 
					
						
						|  |  | 
					
						
						|  | payload = { | 
					
						
						|  | "inputs": prompt_with_images, | 
					
						
						|  | "parameters": { | 
					
						
						|  | "return_full_text": False, | 
					
						
						|  | "max_new_tokens": 200, | 
					
						
						|  | }, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | return json.loads(client.post(json=payload).decode())[0] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def encode_image(image_path): | 
					
						
						|  | if image_path.startswith("http"): | 
					
						
						|  | user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0" | 
					
						
						|  | request_kwargs = { | 
					
						
						|  | "headers": {"User-Agent": user_agent}, | 
					
						
						|  | "stream": True, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | response = requests.get(image_path, **request_kwargs) | 
					
						
						|  | response.raise_for_status() | 
					
						
						|  | content_type = response.headers.get("content-type", "") | 
					
						
						|  |  | 
					
						
						|  | extension = mimetypes.guess_extension(content_type) | 
					
						
						|  | if extension is None: | 
					
						
						|  | extension = ".download" | 
					
						
						|  |  | 
					
						
						|  | fname = str(uuid.uuid4()) + extension | 
					
						
						|  | download_path = os.path.abspath(os.path.join("downloads", fname)) | 
					
						
						|  |  | 
					
						
						|  | with open(download_path, "wb") as fh: | 
					
						
						|  | for chunk in response.iter_content(chunk_size=512): | 
					
						
						|  | fh.write(chunk) | 
					
						
						|  |  | 
					
						
						|  | image_path = download_path | 
					
						
						|  |  | 
					
						
						|  | with open(image_path, "rb") as image_file: | 
					
						
						|  | return base64.b64encode(image_file.read()).decode("utf-8") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def resize_image(image_path): | 
					
						
						|  | img = PIL.Image.open(image_path) | 
					
						
						|  | width, height = img.size | 
					
						
						|  | img = img.resize((int(width / 2), int(height / 2))) | 
					
						
						|  | new_image_path = f"resized_{image_path}" | 
					
						
						|  | img.save(new_image_path) | 
					
						
						|  | return new_image_path | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | class VisualQATool(Tool): | 
					
						
						|  | name = "visualizer" | 
					
						
						|  | description = "A tool that can answer questions about attached images." | 
					
						
						|  | inputs = { | 
					
						
						|  | "image_path": { | 
					
						
						|  | "description": "The path to the image on which to answer the question", | 
					
						
						|  | "type": "string", | 
					
						
						|  | }, | 
					
						
						|  | "question": {"description": "the question to answer", "type": "string", "nullable": True}, | 
					
						
						|  | } | 
					
						
						|  | output_type = "string" | 
					
						
						|  |  | 
					
						
						|  | client = InferenceClient("HuggingFaceM4/idefics2-8b-chatty") | 
					
						
						|  |  | 
					
						
						|  | def forward(self, image_path: str, question: str | None = None) -> str: | 
					
						
						|  | output = "" | 
					
						
						|  | add_note = False | 
					
						
						|  | if not question: | 
					
						
						|  | add_note = True | 
					
						
						|  | question = "Please write a detailed caption for this image." | 
					
						
						|  | try: | 
					
						
						|  | output = process_images_and_text(image_path, question, self.client) | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(e) | 
					
						
						|  | if "Payload Too Large" in str(e): | 
					
						
						|  | new_image_path = resize_image(image_path) | 
					
						
						|  | output = process_images_and_text(new_image_path, question, self.client) | 
					
						
						|  |  | 
					
						
						|  | if add_note: | 
					
						
						|  | output = ( | 
					
						
						|  | f"You did not provide a particular question, so here is a detailed caption for the image: {output}" | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | return output | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | @tool | 
					
						
						|  | def visualizer(image_path: str, question: str | None = None) -> str: | 
					
						
						|  | """A tool that can answer questions about attached images. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | image_path: The path to the image on which to answer the question. This should be a local path to downloaded image. | 
					
						
						|  | question: The question to answer. | 
					
						
						|  | """ | 
					
						
						|  | import mimetypes | 
					
						
						|  | import os | 
					
						
						|  |  | 
					
						
						|  | import requests | 
					
						
						|  |  | 
					
						
						|  | from .visual_qa import encode_image | 
					
						
						|  |  | 
					
						
						|  | add_note = False | 
					
						
						|  | if not question: | 
					
						
						|  | add_note = True | 
					
						
						|  | question = "Please write a detailed caption for this image." | 
					
						
						|  | if not isinstance(image_path, str): | 
					
						
						|  | raise Exception("You should provide at least `image_path` string argument to this tool!") | 
					
						
						|  |  | 
					
						
						|  | mime_type, _ = mimetypes.guess_type(image_path) | 
					
						
						|  | base64_image = encode_image(image_path) | 
					
						
						|  |  | 
					
						
						|  | payload = { | 
					
						
						|  | "model": "gpt-4o", | 
					
						
						|  | "messages": [ | 
					
						
						|  | { | 
					
						
						|  | "role": "user", | 
					
						
						|  | "content": [ | 
					
						
						|  | {"type": "text", "text": question}, | 
					
						
						|  | {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{base64_image}"}}, | 
					
						
						|  | ], | 
					
						
						|  | } | 
					
						
						|  | ], | 
					
						
						|  | "max_tokens": 1000, | 
					
						
						|  | } | 
					
						
						|  | headers = {"Content-Type": "application/json", "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"} | 
					
						
						|  | response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) | 
					
						
						|  | try: | 
					
						
						|  | output = response.json()["choices"][0]["message"]["content"] | 
					
						
						|  | except Exception: | 
					
						
						|  | raise Exception(f"Response format unexpected: {response.json()}") | 
					
						
						|  |  | 
					
						
						|  | if add_note: | 
					
						
						|  | output = f"You did not provide a particular question, so here is a detailed caption for the image: {output}" | 
					
						
						|  |  | 
					
						
						|  | return output | 
					
						
						|  |  |