Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os | |
| import random | |
| import sys | |
| import textwrap | |
| import threading | |
| import time | |
| from ast import literal_eval | |
| import autogen | |
| import chromadb | |
| import isort | |
| import panel as pn | |
| from autogen import Agent, AssistantAgent, UserProxyAgent | |
| from autogen.agentchat.contrib.compressible_agent import CompressibleAgent | |
| from autogen.agentchat.contrib.gpt_assistant_agent import GPTAssistantAgent | |
| from autogen.agentchat.contrib.llava_agent import LLaVAAgent | |
| from autogen.agentchat.contrib.math_user_proxy_agent import MathUserProxyAgent | |
| from autogen.agentchat.contrib.retrieve_assistant_agent import RetrieveAssistantAgent | |
| from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent | |
| from autogen.agentchat.contrib.teachable_agent import TeachableAgent | |
| from autogen.code_utils import extract_code | |
| from configs import ( | |
| DEFAULT_AUTO_REPLY, | |
| DEFAULT_SYSTEM_MESSAGE, | |
| Q1, | |
| Q2, | |
| Q3, | |
| TIMEOUT, | |
| TITLE, | |
| ) | |
| try: | |
| from termcolor import colored | |
| except ImportError: | |
| def colored(x, *args, **kwargs): | |
| return x | |
| def get_retrieve_config(docs_path, model_name, collection_name): | |
| return { | |
| "docs_path": literal_eval(docs_path), | |
| "chunk_token_size": 1000, | |
| "model": model_name, | |
| "embedding_model": "all-mpnet-base-v2", | |
| "get_or_create": True, | |
| "client": chromadb.PersistentClient(path=".chromadb"), | |
| "collection_name": collection_name, | |
| } | |
| # autogen.ChatCompletion.start_logging() | |
| def termination_msg(x): | |
| """Check if a message is a termination message.""" | |
| _msg = str(x.get("content", "")).upper().strip().strip("\n").strip(".") | |
| return isinstance(x, dict) and ( | |
| _msg.endswith("TERMINATE") or _msg.startswith("TERMINATE") | |
| ) | |
| def _is_termination_msg(message): | |
| """Check if a message is a termination message. | |
| Terminate when no code block is detected. Currently only detect python code blocks. | |
| """ | |
| if isinstance(message, dict): | |
| message = message.get("content") | |
| if message is None: | |
| return False | |
| cb = extract_code(message) | |
| contain_code = False | |
| for c in cb: | |
| # todo: support more languages | |
| if c[0] == "python": | |
| contain_code = True | |
| break | |
| return not contain_code | |
| def new_generate_oai_reply( | |
| self, | |
| messages=None, | |
| sender=None, | |
| config=None, | |
| ): | |
| """Generate a reply using autogen.oai.""" | |
| client = self.client if config is None else config | |
| if client is None: | |
| return False, None | |
| if messages is None: | |
| messages = self._oai_messages[sender] | |
| # handle 336006โhttps://cloud.baidu.com/doc/WENXINWORKSHOP/s/tlmyncueh | |
| _context = messages[-1].pop("context", None) | |
| _messages = self._oai_system_message + messages | |
| for idx, msg in enumerate(_messages): | |
| if idx == 0: | |
| continue | |
| if idx % 2 == 1: | |
| msg["role"] = "user" if msg.get("role") != "function" else "function" | |
| else: | |
| msg["role"] = "assistant" | |
| if len(_messages) % 2 == 1: | |
| _messages.append({"content": DEFAULT_AUTO_REPLY, "role": "user"}) | |
| # print(f"messages: {_messages}") | |
| response = client.create(context=_context, messages=_messages) | |
| # print(f"{response=}") | |
| return True, client.extract_text_or_function_call(response)[0] | |
| def initialize_agents( | |
| llm_config, | |
| agent_name, | |
| system_msg, | |
| agent_type, | |
| retrieve_config=None, | |
| code_execution_config=False, | |
| ): | |
| agent_name = agent_name.strip() | |
| system_msg = system_msg.strip() | |
| if "RetrieveUserProxyAgent" == agent_type: | |
| agent = RetrieveUserProxyAgent( | |
| name=agent_name, | |
| system_message=system_msg, | |
| is_termination_msg=_is_termination_msg, | |
| human_input_mode="TERMINATE", | |
| max_consecutive_auto_reply=5, | |
| retrieve_config=retrieve_config, | |
| code_execution_config=code_execution_config, # set to False if you don't want to execute the code | |
| default_auto_reply=DEFAULT_AUTO_REPLY, | |
| ) | |
| elif "GPTAssistantAgent" == agent_type: | |
| agent = GPTAssistantAgent( | |
| name=agent_name, | |
| instructions=system_msg if system_msg else DEFAULT_SYSTEM_MESSAGE, | |
| llm_config=llm_config, | |
| is_termination_msg=termination_msg, | |
| ) | |
| elif "CompressibleAgent" == agent_type: | |
| compress_config = { | |
| "mode": "COMPRESS", | |
| "trigger_count": 600, # set this to a large number for less frequent compression | |
| "verbose": True, # to allow printing of compression information: contex before and after compression | |
| "leave_last_n": 2, | |
| } | |
| agent = CompressibleAgent( | |
| name=agent_name, | |
| system_message=system_msg if system_msg else DEFAULT_SYSTEM_MESSAGE, | |
| llm_config=llm_config, | |
| compress_config=compress_config, | |
| is_termination_msg=termination_msg, | |
| ) | |
| elif "UserProxy" in agent_type: | |
| agent = globals()[agent_type]( | |
| name=agent_name, | |
| is_termination_msg=termination_msg, | |
| human_input_mode="TERMINATE", | |
| system_message=system_msg, | |
| default_auto_reply=DEFAULT_AUTO_REPLY, | |
| max_consecutive_auto_reply=5, | |
| code_execution_config=code_execution_config, | |
| ) | |
| else: | |
| agent = globals()[agent_type]( | |
| name=agent_name, | |
| is_termination_msg=termination_msg, | |
| human_input_mode="NEVER", | |
| system_message=system_msg if system_msg else DEFAULT_SYSTEM_MESSAGE, | |
| llm_config=llm_config, | |
| ) | |
| # if any(["ernie" in cfg["model"].lower() for cfg in llm_config["config_list"]]): | |
| if "ernie" in llm_config["config_list"][0]["model"].lower(): | |
| # Hack for ERNIE Bot models | |
| # print("Hack for ERNIE Bot models.") | |
| agent._reply_func_list.pop(-1) | |
| agent.register_reply([Agent, None], new_generate_oai_reply, -1) | |
| return agent | |
| async def get_human_input(name, prompt: str, instance=None) -> str: | |
| """Get human input.""" | |
| if instance is None: | |
| return input(prompt) | |
| get_input_widget = pn.widgets.TextAreaInput( | |
| placeholder=prompt, name="", sizing_mode="stretch_width" | |
| ) | |
| get_input_checkbox = pn.widgets.Checkbox(name="Check to Submit Feedback") | |
| instance.send( | |
| pn.Row(get_input_widget, get_input_checkbox), user=name, respond=False | |
| ) | |
| ts = time.time() | |
| while True: | |
| if time.time() - ts > TIMEOUT: | |
| instance.send( | |
| f"You didn't provide your feedback in {TIMEOUT} seconds, exit.", | |
| user=name, | |
| respond=False, | |
| ) | |
| reply = "exit" | |
| break | |
| if get_input_widget.value != "" and get_input_checkbox.value is True: | |
| get_input_widget.disabled = True | |
| reply = get_input_widget.value | |
| break | |
| await asyncio.sleep(0.1) | |
| return reply | |
| async def check_termination_and_human_reply( | |
| self, | |
| messages=None, | |
| sender=None, | |
| config=None, | |
| instance=None, | |
| ): | |
| """Check if the conversation should be terminated, and if human reply is provided.""" | |
| if config is None: | |
| config = self | |
| if messages is None: | |
| messages = self._oai_messages[sender] | |
| message = messages[-1] | |
| reply = "" | |
| no_human_input_msg = "" | |
| if self.human_input_mode == "ALWAYS": | |
| reply = await get_human_input( | |
| self.name, | |
| f"Provide feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to end the conversation: ", | |
| instance, | |
| ) | |
| no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" | |
| # if the human input is empty, and the message is a termination message, then we will terminate the conversation | |
| reply = reply if reply or not self._is_termination_msg(message) else "exit" | |
| else: | |
| if ( | |
| self._consecutive_auto_reply_counter[sender] | |
| >= self._max_consecutive_auto_reply_dict[sender] | |
| ): | |
| if self.human_input_mode == "NEVER": | |
| reply = "exit" | |
| else: | |
| # self.human_input_mode == "TERMINATE": | |
| terminate = self._is_termination_msg(message) | |
| reply = await get_human_input( | |
| self.name, | |
| f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: " | |
| if terminate | |
| else f"Please give feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to stop the conversation: ", | |
| instance, | |
| ) | |
| no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" | |
| # if the human input is empty, and the message is a termination message, then we will terminate the conversation | |
| reply = reply if reply or not terminate else "exit" | |
| elif self._is_termination_msg(message): | |
| if self.human_input_mode == "NEVER": | |
| reply = "exit" | |
| else: | |
| # self.human_input_mode == "TERMINATE": | |
| reply = await get_human_input( | |
| self.name, | |
| f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: ", | |
| instance, | |
| ) | |
| no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else "" | |
| # if the human input is empty, and the message is a termination message, then we will terminate the conversation | |
| reply = reply or "exit" | |
| # print the no_human_input_msg | |
| if no_human_input_msg: | |
| print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True) | |
| # stop the conversation | |
| if reply == "exit": | |
| # reset the consecutive_auto_reply_counter | |
| self._consecutive_auto_reply_counter[sender] = 0 | |
| return True, None | |
| # send the human reply | |
| if reply or self._max_consecutive_auto_reply_dict[sender] == 0: | |
| # reset the consecutive_auto_reply_counter | |
| self._consecutive_auto_reply_counter[sender] = 0 | |
| return True, reply | |
| # increment the consecutive_auto_reply_counter | |
| self._consecutive_auto_reply_counter[sender] += 1 | |
| if self.human_input_mode != "NEVER": | |
| print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True) | |
| return False, None | |
| async def format_code(code_to_format: str) -> str: | |
| """Format the code using isort and black.""" | |
| filename = f"temp_code_{int(time.time())}_{random.randint(10000, 99999)}.py" | |
| with open(filename, "w") as file: | |
| file.write(code_to_format) | |
| isort.file( | |
| filename, profile="black", known_first_party=["autogen"], float_to_top=True | |
| ) | |
| formatted_code = "" | |
| with open(filename, "r") as file: | |
| formatted_code = file.read() | |
| os.remove(filename) | |
| return formatted_code | |
| async def generate_code(agents, manager, contents, code_editor, groupchat): | |
| code = """import autogen | |
| import os | |
| from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent | |
| from autogen.agentchat.contrib.math_user_proxy_agent import MathUserProxyAgent | |
| from autogen.code_utils import extract_code | |
| config_list = autogen.config_list_from_json( | |
| "OAI_CONFIG_LIST", | |
| file_location=".", | |
| ) | |
| if not config_list: | |
| os.environ["MODEL"] = "<your model name>" | |
| os.environ["OPENAI_API_KEY"] = "<your openai api key>" | |
| os.environ["OPENAI_BASE_URL"] = "<your openai base url>" # optional | |
| config_list = autogen.config_list_from_models( | |
| model_list=[os.environ.get("MODEL", "gpt-35-turbo")], | |
| ) | |
| llm_config = { | |
| "timeout": 60, | |
| "cache_seed": 42, | |
| "config_list": config_list, | |
| "temperature": 0, | |
| } | |
| def termination_msg(x): | |
| _msg = str(x.get("content", "")).upper().strip().strip("\\n").strip(".") | |
| return isinstance(x, dict) and (_msg.endswith("TERMINATE") or _msg.startswith("TERMINATE")) | |
| def _is_termination_msg(message): | |
| if isinstance(message, dict): | |
| message = message.get("content") | |
| if message is None: | |
| return False | |
| cb = extract_code(message) | |
| contain_code = False | |
| for c in cb: | |
| # todo: support more languages | |
| if c[0] == "python": | |
| contain_code = True | |
| break | |
| return not contain_code | |
| agents = [] | |
| """ | |
| for agent in agents: | |
| if isinstance(agent, RetrieveUserProxyAgent): | |
| _retrieve_config = agent._retrieve_config | |
| _retrieve_config["client"] = 'chromadb.PersistentClient(path=".chromadb")' | |
| _code = f"""from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent | |
| import chromadb | |
| agent = RetrieveUserProxyAgent( | |
| name="{agent.name}", | |
| system_message=\"\"\"{agent.system_message}\"\"\", | |
| is_termination_msg=_is_termination_msg, | |
| human_input_mode="TERMINATE", | |
| max_consecutive_auto_reply=5, | |
| retrieve_config={_retrieve_config}, | |
| code_execution_config={agent._code_execution_config}, # set to False if you don't want to execute the code | |
| default_auto_reply="{DEFAULT_AUTO_REPLY}", | |
| ) | |
| """ | |
| _code = _code.replace( | |
| """'chromadb.PersistentClient(path=".chromadb")'""", | |
| "chromadb.PersistentClient(path='.chromadb')", | |
| ) | |
| elif isinstance(agent, GPTAssistantAgent): | |
| _code = f"""from auotgen.agentchat.contrib.gpt_assistant_agent import GPTAssistantAgent | |
| agent = GPTAssistantAgent( | |
| name="{agent.name}", | |
| instructions=\"\"\"{agent.system_message}\"\"\", | |
| llm_config=llm_config, | |
| is_termination_msg=termination_msg, | |
| ) | |
| """ | |
| elif isinstance(agent, CompressibleAgent): | |
| _code = f"""from autogen.agentchat.contrib.compressible_agent import CompressibleAgent | |
| compress_config = {{ | |
| "mode": "COMPRESS", | |
| "trigger_count": 600, # set this to a large number for less frequent compression | |
| "verbose": True, # to allow printing of compression information: contex before and after compression | |
| "leave_last_n": 2, | |
| }} | |
| agent = CompressibleAgent( | |
| name="{agent.name}", | |
| system_message=\"\"\"{agent.system_message}\"\"\", | |
| llm_config=llm_config, | |
| compress_config=compress_config, | |
| is_termination_msg=termination_msg, | |
| ) | |
| """ | |
| elif isinstance(agent, UserProxyAgent): | |
| _code = f"""from autogen import UserProxyAgent | |
| agent = UserProxyAgent( | |
| name="{agent.name}", | |
| is_termination_msg=termination_msg, | |
| human_input_mode="TERMINATE", | |
| system_message=\"\"\"{agent.system_message}\"\"\", | |
| default_auto_reply="{DEFAULT_AUTO_REPLY}", | |
| max_consecutive_auto_reply=5, | |
| code_execution_config={agent._code_execution_config}, | |
| ) | |
| """ | |
| elif isinstance(agent, RetrieveAssistantAgent): | |
| _code = f"""from autogen.agentchat.contrib.retrieve_assistant_agent import RetrieveAssistantAgent | |
| agent = RetrieveAssistantAgent( | |
| name="{agent.name}", | |
| system_message=\"\"\"{agent.system_message}\"\"\", | |
| llm_config=llm_config, | |
| is_termination_msg=termination_msg, | |
| ) | |
| """ | |
| elif isinstance(agent, AssistantAgent): | |
| _code = f"""from autogen import AssistantAgent | |
| agent = AssistantAgent( | |
| name="{agent.name}", | |
| system_message=\"\"\"{agent.system_message}\"\"\", | |
| llm_config=llm_config, | |
| is_termination_msg=termination_msg, | |
| ) | |
| """ | |
| code += _code + "\n" + "agents.append(agent)\n\n" | |
| _code = """ | |
| init_sender = None | |
| for agent in agents: | |
| if "UserProxy" in str(type(agent)): | |
| init_sender = agent | |
| break | |
| if not init_sender: | |
| init_sender = agents[0] | |
| """ | |
| code += _code | |
| if manager: | |
| _code = f""" | |
| groupchat = autogen.GroupChat( | |
| agents=agents, messages=[], max_round=12, speaker_selection_method="{groupchat.speaker_selection_method}", allow_repeat_speaker=False | |
| ) # todo: auto, sometimes message has no name | |
| manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config) | |
| recipient = manager | |
| """ | |
| else: | |
| _code = """ | |
| recipient = agents[1] if agents[1] != init_sender else agents[0] | |
| """ | |
| code += _code | |
| _code = f""" | |
| if isinstance(init_sender, (RetrieveUserProxyAgent, MathUserProxyAgent)): | |
| init_sender.initiate_chat(recipient, problem="{contents}") | |
| else: | |
| init_sender.initiate_chat(recipient, message="{contents}") | |
| """ | |
| code += _code | |
| code = textwrap.dedent(code) | |
| code_editor.value = await format_code(code) | |