|  | import json | 
					
						
						|  | import os | 
					
						
						|  | import shutil | 
					
						
						|  | import textwrap | 
					
						
						|  | from pathlib import Path | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | from smolagents.utils import AgentError | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def serialize_agent_error(obj): | 
					
						
						|  | if isinstance(obj, AgentError): | 
					
						
						|  | return {"error_type": obj.__class__.__name__, "message": obj.message} | 
					
						
						|  | else: | 
					
						
						|  | return str(obj) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_image_description(file_name: str, question: str, visual_inspection_tool) -> str: | 
					
						
						|  | prompt = f"""Write a caption of 5 sentences for this image. Pay special attention to any details that might be useful for someone answering the following question: | 
					
						
						|  | {question}. But do not try to answer the question directly! | 
					
						
						|  | Do not add any information that is not present in the image.""" | 
					
						
						|  | return visual_inspection_tool(image_path=file_name, question=prompt) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_document_description(file_path: str, question: str, document_inspection_tool) -> str: | 
					
						
						|  | prompt = f"""Write a caption of 5 sentences for this document. Pay special attention to any details that might be useful for someone answering the following question: | 
					
						
						|  | {question}. But do not try to answer the question directly! | 
					
						
						|  | Do not add any information that is not present in the document.""" | 
					
						
						|  | return document_inspection_tool.forward_initial_exam_mode(file_path=file_path, question=prompt) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_single_file_description(file_path: str, question: str, visual_inspection_tool, document_inspection_tool): | 
					
						
						|  | file_extension = file_path.split(".")[-1] | 
					
						
						|  | if file_extension in ["png", "jpg", "jpeg"]: | 
					
						
						|  | file_description = f" - Attached image: {file_path}" | 
					
						
						|  | file_description += ( | 
					
						
						|  | f"\n     -> Image description: {get_image_description(file_path, question, visual_inspection_tool)}" | 
					
						
						|  | ) | 
					
						
						|  | return file_description | 
					
						
						|  | elif file_extension in ["pdf", "xls", "xlsx", "docx", "doc", "xml"]: | 
					
						
						|  | image_path = file_path.split(".")[0] + ".png" | 
					
						
						|  | if os.path.exists(image_path): | 
					
						
						|  | description = get_image_description(image_path, question, visual_inspection_tool) | 
					
						
						|  | file_path = image_path | 
					
						
						|  | else: | 
					
						
						|  | description = get_document_description(file_path, question, document_inspection_tool) | 
					
						
						|  | file_description = f" - Attached document: {file_path}" | 
					
						
						|  | file_description += f"\n     -> File description: {description}" | 
					
						
						|  | return file_description | 
					
						
						|  | elif file_extension in ["mp3", "m4a", "wav"]: | 
					
						
						|  | return f" - Attached audio: {file_path}" | 
					
						
						|  | else: | 
					
						
						|  | return f" - Attached file: {file_path}" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_zip_description(file_path: str, question: str, visual_inspection_tool, document_inspection_tool): | 
					
						
						|  | folder_path = file_path.replace(".zip", "") | 
					
						
						|  | os.makedirs(folder_path, exist_ok=True) | 
					
						
						|  | shutil.unpack_archive(file_path, folder_path) | 
					
						
						|  |  | 
					
						
						|  | prompt_use_files = "" | 
					
						
						|  | for root, dirs, files in os.walk(folder_path): | 
					
						
						|  | for file in files: | 
					
						
						|  | file_path = os.path.join(root, file) | 
					
						
						|  | prompt_use_files += "\n" + textwrap.indent( | 
					
						
						|  | get_single_file_description(file_path, question, visual_inspection_tool, document_inspection_tool), | 
					
						
						|  | prefix="    ", | 
					
						
						|  | ) | 
					
						
						|  | return prompt_use_files | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_tasks_to_run(data, total: int, base_filename: Path, tasks_ids: list[int]): | 
					
						
						|  | f = base_filename.parent / f"{base_filename.stem}_answers.jsonl" | 
					
						
						|  | done = set() | 
					
						
						|  | if f.exists(): | 
					
						
						|  | with open(f, encoding="utf-8") as fh: | 
					
						
						|  | done = {json.loads(line)["task_id"] for line in fh if line.strip()} | 
					
						
						|  |  | 
					
						
						|  | tasks = [] | 
					
						
						|  | for i in range(total): | 
					
						
						|  | task_id = int(data[i]["task_id"]) | 
					
						
						|  | if task_id not in done: | 
					
						
						|  | if tasks_ids is not None: | 
					
						
						|  | if task_id in tasks_ids: | 
					
						
						|  | tasks.append(data[i]) | 
					
						
						|  | else: | 
					
						
						|  | tasks.append(data[i]) | 
					
						
						|  | return tasks | 
					
						
						|  |  |