Spaces:
Sleeping
Sleeping
import json | |
import uuid | |
from typing import Any, Dict, List, Optional, Union | |
from pydantic_core import PydanticCustomError | |
from crewai.agents.cache import CacheHandler | |
from crewai.tools.agent_tools import AgentTools | |
from typing import Any, List, Optional | |
from langchain.agents.format_scratchpad import format_log_to_str | |
from langchain_openai import ChatOpenAI | |
from langchain.memory import ConversationSummaryMemory | |
from langchain.tools.render import render_text_description | |
from langchain_core.runnables.config import RunnableConfig | |
# Content from crew.py | |
from pydantic import ( | |
UUID4, | |
BaseModel, | |
ConfigDict, | |
Field, | |
InstanceOf, | |
Json, | |
field_validator, | |
model_validator, | |
) | |
from crewai.agents import ( | |
CacheHandler, | |
CrewAgentExecutor, | |
CrewAgentOutputParser, | |
ToolsHandler, | |
) | |
class Gmix(BaseModel): | |
"""Class that represents a group of agents, how they should work together and their tasks.""" | |
__hash__ = object.__hash__ | |
model_config = ConfigDict(arbitrary_types_allowed=True) | |
tasks: List[Task] = Field(description="List of tasks", default_factory=list) | |
agents: List[Agent] = Field( | |
description="List of agents in this crew.", default_factory=list | |
) | |
process: Process = Field( | |
description="Process that the crew will follow.", default=Process.sequential | |
) | |
verbose: Union[int, bool] = Field( | |
description="Verbose mode for the Agent Execution", default=0 | |
) | |
config: Optional[Union[Json, Dict[str, Any]]] = Field( | |
description="Configuration of the crew.", default=None | |
) | |
cache_handler: Optional[InstanceOf[CacheHandler]] = Field( | |
default=CacheHandler(), description="An instance of the CacheHandler class." | |
) | |
id: UUID4 = Field( | |
default_factory=uuid.uuid4, | |
frozen=True, | |
description="Unique identifier for the object, not set by user.", | |
) | |
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: | |
if v: | |
raise PydanticCustomError( | |
"may_not_set_field", "This field is not to be set by the user.", {} | |
) | |
def check_config_type(cls, v: Union[Json, Dict[str, Any]]): | |
if isinstance(v, Json): | |
return json.loads(v) | |
return v | |
def check_config(self): | |
if not self.config and not self.tasks and not self.agents: | |
raise PydanticCustomError( | |
"missing_keys", "Either agents and task need to be set or config.", {} | |
) | |
if self.config: | |
if not self.config.get("agents") or not self.config.get("tasks"): | |
raise PydanticCustomError( | |
"missing_keys_in_config", "Config should have agents and tasks", {} | |
) | |
self.agents = [Agent(**agent) for agent in self.config["agents"]] | |
tasks = [] | |
for task in self.config["tasks"]: | |
task_agent = [agt for agt in self.agents if agt.role == task["agent"]][ | |
0 | |
] | |
del task["agent"] | |
tasks.append(Task(**task, agent=task_agent)) | |
self.tasks = tasks | |
if self.agents: | |
for agent in self.agents: | |
agent.set_cache_handler(self.cache_handler) | |
return self | |
def kickoff(self) -> str: | |
"""Kickoff the crew to work on its tasks. | |
Returns: | |
Output of the crew for each task. | |
""" | |
for agent in self.agents: | |
agent.cache_handler = self.cache_handler | |
if self.process == Process.sequential: | |
return self.__sequential_loop() | |
def __sequential_loop(self) -> str: | |
"""Loop that executes the sequential process. | |
Returns: | |
Output of the crew. | |
""" | |
task_outcome = None | |
for task in self.tasks: | |
# Add delegation tools to the task if the agent allows it | |
if task.agent.allow_delegation: | |
tools = AgentTools(agents=self.agents).tools() | |
task.tools += tools | |
self.__log("debug", f"Working Agent: {task.agent.role}") | |
self.__log("info", f"Starting Task: {task.description} ...") | |
task_outcome = task.execute(task_outcome) | |
self.__log("debug", f"Task output: {task_outcome}") | |
return task_outcome | |
def __log(self, level, message): | |
"""Log a message""" | |
level_map = {"debug": 1, "info": 2} | |
verbose_level = ( | |
2 if isinstance(self.verbose, bool) and self.verbose else self.verbose | |
) | |
if verbose_level and level_map[level] <= verbose_level: | |
print(message) | |
class Agent(BaseModel): | |
"""Represents an agent in a system. | |
Each agent has a role, a goal, a backstory, and an optional language model (llm). | |
The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents. | |
Attributes: | |
agent_executor: An instance of the CrewAgentExecutor class. | |
role: The role of the agent. | |
goal: The objective of the agent. | |
backstory: The backstory of the agent. | |
llm: The language model that will run the agent. | |
memory: Whether the agent should have memory or not. | |
verbose: Whether the agent execution should be in verbose mode. | |
allow_delegation: Whether the agent is allowed to delegate tasks to other agents. | |
""" | |
__hash__ = object.__hash__ | |
model_config = ConfigDict(arbitrary_types_allowed=True) | |
id: UUID4 = Field( | |
default_factory=uuid.uuid4, | |
frozen=True, | |
description="Unique identifier for the object, not set by user.", | |
) | |
role: str = Field(description="Role of the agent") | |
goal: str = Field(description="Objective of the agent") | |
backstory: str = Field(description="Backstory of the agent") | |
llm: Optional[Any] = Field( | |
default_factory=lambda: ChatOpenAI( | |
temperature=0.7, | |
model_name="gpt-4", | |
), | |
description="Language model that will run the agent.", | |
) | |
memory: bool = Field( | |
default=True, description="Whether the agent should have memory or not" | |
) | |
verbose: bool = Field( | |
default=False, description="Verbose mode for the Agent Execution" | |
) | |
allow_delegation: bool = Field( | |
default=True, description="Allow delegation of tasks to agents" | |
) | |
tools: List[Any] = Field( | |
default_factory=list, description="Tools at agents disposal" | |
) | |
agent_executor: Optional[InstanceOf[CrewAgentExecutor]] = Field( | |
default=None, description="An instance of the CrewAgentExecutor class." | |
) | |
tools_handler: Optional[InstanceOf[ToolsHandler]] = Field( | |
default=None, description="An instance of the ToolsHandler class." | |
) | |
cache_handler: Optional[InstanceOf[CacheHandler]] = Field( | |
default=CacheHandler(), description="An instance of the CacheHandler class." | |
) | |
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: | |
if v: | |
raise PydanticCustomError( | |
"may_not_set_field", "This field is not to be set by the user.", {} | |
) | |
def check_agent_executor(self) -> "Agent": | |
if not self.agent_executor: | |
self.set_cache_handler(self.cache_handler) | |
return self | |
def execute_task( | |
self, task: str, context: str = None, tools: List[Any] = None | |
) -> str: | |
"""Execute a task with the agent. | |
Args: | |
task: Task to execute. | |
context: Context to execute the task in. | |
tools: Tools to use for the task. | |
Returns: | |
Output of the agent | |
""" | |
if context: | |
task = "\n".join( | |
[task, "\nThis is the context you are working with:", context] | |
) | |
tools = tools or self.tools | |
self.agent_executor.tools = tools | |
return self.agent_executor.invoke( | |
{ | |
"input": task, | |
"tool_names": self.__tools_names(tools), | |
"tools": render_text_description(tools), | |
}, | |
RunnableConfig(callbacks=[self.tools_handler]), | |
)["output"] | |
def set_cache_handler(self, cache_handler) -> None: | |
self.cache_handler = cache_handler | |
self.tools_handler = ToolsHandler(cache=self.cache_handler) | |
self.__create_agent_executor() | |
def __create_agent_executor(self) -> CrewAgentExecutor: | |
"""Create an agent executor for the agent. | |
Returns: | |
An instance of the CrewAgentExecutor class. | |
""" | |
agent_args = { | |
"input": lambda x: x["input"], | |
"tools": lambda x: x["tools"], | |
"tool_names": lambda x: x["tool_names"], | |
"agent_scratchpad": lambda x: format_log_to_str(x["intermediate_steps"]), | |
} | |
executor_args = { | |
"tools": self.tools, | |
"verbose": self.verbose, | |
"handle_parsing_errors": True, | |
} | |
if self.memory: | |
summary_memory = ConversationSummaryMemory( | |
llm=self.llm, memory_key="chat_history", input_key="input" | |
) | |
executor_args["memory"] = summary_memory | |
agent_args["chat_history"] = lambda x: x["chat_history"] | |
prompt = Prompts.TASK_EXECUTION_WITH_MEMORY_PROMPT | |
else: | |
prompt = Prompts.TASK_EXECUTION_PROMPT | |
execution_prompt = prompt.partial( | |
goal=self.goal, | |
role=self.role, | |
backstory=self.backstory, | |
) | |
bind = self.llm.bind(stop=["\nObservation"]) | |
inner_agent = ( | |
agent_args | |
| execution_prompt | |
| bind | |
| CrewAgentOutputParser( | |
tools_handler=self.tools_handler, cache=self.cache_handler | |
) | |
) | |
self.agent_executor = CrewAgentExecutor(agent=inner_agent, **executor_args) | |
def __tools_names(tools) -> str: | |
return ", ".join([t.name for t in tools]) | |
# Content from task.py | |
class Task(BaseModel): | |
"""Class that represent a task to be executed.""" | |
__hash__ = object.__hash__ | |
description: str = Field(description="Description of the actual task.") | |
agent: Optional[Agent] = Field( | |
description="Agent responsible for the task.", default=None | |
) | |
tools: List[Any] = Field( | |
default_factory=list, | |
description="Tools the agent are limited to use for this task.", | |
) | |
id: UUID4 = Field( | |
default_factory=uuid.uuid4, | |
frozen=True, | |
description="Unique identifier for the object, not set by user.", | |
) | |
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: | |
if v: | |
raise PydanticCustomError( | |
"may_not_set_field", "This field is not to be set by the user.", {} | |
) | |
def check_tools(self): | |
if not self.tools and (self.agent and self.agent.tools): | |
self.tools.extend(self.agent.tools) | |
return self | |
def execute(self, context: str = None) -> str: | |
"""Execute the task. | |
Returns: | |
Output of the task. | |
""" | |
if self.agent: | |
return self.agent.execute_task( | |
task=self.description, context=context, tools=self.tools | |
) | |
else: | |
raise Exception( | |
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Gmix using a specific process that support that, either consensual or hierarchical." | |
) | |
# Content from process.py | |
class Process(str, Enum): | |
""" | |
Class representing the different processes that can be used to tackle tasks | |
""" | |
sequential = "sequential" | |
# TODO: consensual = 'consensual' | |
# TODO: hierarchical = 'hierarchical' | |
# Content from prompts.py | |
"""Prompts for generic agent.""" | |
class Prompts(BaseModel): | |
"""Prompts for generic agent.""" | |
TASK_SLICE: ClassVar[str] = dedent( | |
"""\ | |
Begin! This is VERY important to you, your job depends on it! | |
Current Task: {input}""" | |
) | |
SCRATCHPAD_SLICE: ClassVar[str] = "\n{agent_scratchpad}" | |
MEMORY_SLICE: ClassVar[str] = dedent( | |
"""\ | |
This is the summary of your work so far: | |
{chat_history}""" | |
) | |
ROLE_PLAYING_SLICE: ClassVar[str] = dedent( | |
"""\ | |
You are {role}. | |
{backstory} | |
Your personal goal is: {goal}""" | |
) | |
TOOLS_SLICE: ClassVar[str] = dedent( | |
"""\ | |
TOOLS: | |
------ | |
You have access to the following tools: | |
{tools} | |
To use a tool, please use the exact following format: | |
``` | |
Thought: Do I need to use a tool? Yes | |
Action: the action to take, should be one of [{tool_names}], just the name. | |
Action Input: the input to the action | |
Observation: the result of the action | |
``` | |
When you have a response for your task, or if you do not need to use a tool, you MUST use the format: | |
``` | |
Thought: Do I need to use a tool? No | |
Final Answer: [your response here] | |
```""" | |
) | |
VOTING_SLICE: ClassVar[str] = dedent( | |
"""\ | |
You are working on a crew with your co-workers and need to decide who will execute the task. | |
These are your format instructions: | |
{format_instructions} | |
These are your co-workers and their roles: | |
{coworkers}""" | |
) | |
TASK_EXECUTION_WITH_MEMORY_PROMPT: ClassVar[str] = PromptTemplate.from_template( | |
ROLE_PLAYING_SLICE + TOOLS_SLICE + MEMORY_SLICE + TASK_SLICE + SCRATCHPAD_SLICE | |
) | |
TASK_EXECUTION_PROMPT: ClassVar[str] = PromptTemplate.from_template( | |
ROLE_PLAYING_SLICE + TOOLS_SLICE + TASK_SLICE + SCRATCHPAD_SLICE | |
) | |
CONSENSUNS_VOTING_PROMPT: ClassVar[str] = PromptTemplate.from_template( | |
ROLE_PLAYING_SLICE + VOTING_SLICE + TASK_SLICE + SCRATCHPAD_SLICE | |
) | |