Spaces:
Sleeping
Sleeping
| # coding=utf-8 | |
| # Copyright 2024 HuggingFace Inc. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import unittest | |
| from smolagents import ( | |
| AgentError, | |
| AgentImage, | |
| CodeAgent, | |
| ToolCallingAgent, | |
| stream_to_gradio, | |
| ) | |
| from smolagents.models import ( | |
| ChatMessage, | |
| ChatMessageToolCall, | |
| ChatMessageToolCallDefinition, | |
| ) | |
| from smolagents.monitoring import AgentLogger, LogLevel | |
| class FakeLLMModel: | |
| def __init__(self): | |
| self.last_input_token_count = 10 | |
| self.last_output_token_count = 20 | |
| def __call__(self, prompt, tools_to_call_from=None, **kwargs): | |
| if tools_to_call_from is not None: | |
| return ChatMessage( | |
| role="assistant", | |
| content="", | |
| tool_calls=[ | |
| ChatMessageToolCall( | |
| id="fake_id", | |
| type="function", | |
| function=ChatMessageToolCallDefinition(name="final_answer", arguments={"answer": "image"}), | |
| ) | |
| ], | |
| ) | |
| else: | |
| return ChatMessage( | |
| role="assistant", | |
| content=""" | |
| Code: | |
| ```py | |
| final_answer('This is the final answer.') | |
| ```""", | |
| ) | |
| class MonitoringTester(unittest.TestCase): | |
| def test_code_agent_metrics(self): | |
| agent = CodeAgent( | |
| tools=[], | |
| model=FakeLLMModel(), | |
| max_steps=1, | |
| ) | |
| agent.run("Fake task") | |
| self.assertEqual(agent.monitor.total_input_token_count, 10) | |
| self.assertEqual(agent.monitor.total_output_token_count, 20) | |
| def test_toolcalling_agent_metrics(self): | |
| agent = ToolCallingAgent( | |
| tools=[], | |
| model=FakeLLMModel(), | |
| max_steps=1, | |
| ) | |
| agent.run("Fake task") | |
| self.assertEqual(agent.monitor.total_input_token_count, 10) | |
| self.assertEqual(agent.monitor.total_output_token_count, 20) | |
| def test_code_agent_metrics_max_steps(self): | |
| class FakeLLMModelMalformedAnswer: | |
| def __init__(self): | |
| self.last_input_token_count = 10 | |
| self.last_output_token_count = 20 | |
| def __call__(self, prompt, **kwargs): | |
| return ChatMessage(role="assistant", content="Malformed answer") | |
| agent = CodeAgent( | |
| tools=[], | |
| model=FakeLLMModelMalformedAnswer(), | |
| max_steps=1, | |
| ) | |
| agent.run("Fake task") | |
| self.assertEqual(agent.monitor.total_input_token_count, 20) | |
| self.assertEqual(agent.monitor.total_output_token_count, 40) | |
| def test_code_agent_metrics_generation_error(self): | |
| class FakeLLMModelGenerationException: | |
| def __init__(self): | |
| self.last_input_token_count = 10 | |
| self.last_output_token_count = 20 | |
| def __call__(self, prompt, **kwargs): | |
| self.last_input_token_count = 10 | |
| self.last_output_token_count = 0 | |
| raise Exception("Cannot generate") | |
| agent = CodeAgent( | |
| tools=[], | |
| model=FakeLLMModelGenerationException(), | |
| max_steps=1, | |
| ) | |
| agent.run("Fake task") | |
| self.assertEqual(agent.monitor.total_input_token_count, 20) # Should have done two monitoring callbacks | |
| self.assertEqual(agent.monitor.total_output_token_count, 0) | |
| def test_streaming_agent_text_output(self): | |
| agent = CodeAgent( | |
| tools=[], | |
| model=FakeLLMModel(), | |
| max_steps=1, | |
| ) | |
| # Use stream_to_gradio to capture the output | |
| outputs = list(stream_to_gradio(agent, task="Test task")) | |
| self.assertEqual(len(outputs), 7) | |
| final_message = outputs[-1] | |
| self.assertEqual(final_message.role, "assistant") | |
| self.assertIn("This is the final answer.", final_message.content) | |
| def test_streaming_agent_image_output(self): | |
| agent = ToolCallingAgent( | |
| tools=[], | |
| model=FakeLLMModel(), | |
| max_steps=1, | |
| ) | |
| # Use stream_to_gradio to capture the output | |
| outputs = list( | |
| stream_to_gradio( | |
| agent, | |
| task="Test task", | |
| additional_args=dict(image=AgentImage(value="path.png")), | |
| ) | |
| ) | |
| self.assertEqual(len(outputs), 5) | |
| final_message = outputs[-1] | |
| self.assertEqual(final_message.role, "assistant") | |
| self.assertIsInstance(final_message.content, dict) | |
| self.assertEqual(final_message.content["path"], "path.png") | |
| self.assertEqual(final_message.content["mime_type"], "image/png") | |
| def test_streaming_with_agent_error(self): | |
| logger = AgentLogger(level=LogLevel.INFO) | |
| def dummy_model(prompt, **kwargs): | |
| raise AgentError("Simulated agent error", logger) | |
| agent = CodeAgent( | |
| tools=[], | |
| model=dummy_model, | |
| max_steps=1, | |
| ) | |
| # Use stream_to_gradio to capture the output | |
| outputs = list(stream_to_gradio(agent, task="Test task")) | |
| self.assertEqual(len(outputs), 9) | |
| final_message = outputs[-1] | |
| self.assertEqual(final_message.role, "assistant") | |
| self.assertIn("Simulated agent error", final_message.content) | |