Spaces:
Running
Running
import os | |
import pprint | |
import re | |
from typing import List, Optional, Union | |
from qwen_agent import Agent, MultiAgentHub | |
from qwen_agent.agents.user_agent import PENDING_USER_INPUT | |
from qwen_agent.gui.gradio_utils import format_cover_html | |
from qwen_agent.gui.utils import convert_fncall_to_text, convert_history_to_chatbot, get_avatar_image | |
from qwen_agent.llm.schema import CONTENT, FILE, IMAGE, NAME, ROLE, USER, Message | |
from qwen_agent.log import logger | |
from qwen_agent.utils.utils import print_traceback | |
from patching import common_programming_language_extensions | |
class WebUI: | |
"""A Common chatbot application for agent.""" | |
def __init__(self, agent: Union[Agent, MultiAgentHub, List[Agent]], chatbot_config: Optional[dict] = None): | |
""" | |
Initialization the chatbot. | |
Args: | |
agent: The agent or a list of agents, | |
supports various types of agents such as Assistant, GroupChat, Router, etc. | |
chatbot_config: The chatbot configuration. | |
Set the configuration as {'user.name': '', 'user.avatar': '', 'agent.avatar': '', 'input.placeholder': '', 'prompt.suggestions': []}. | |
""" | |
chatbot_config = chatbot_config or {} | |
if isinstance(agent, MultiAgentHub): | |
self.agent_list = [agent for agent in agent.nonuser_agents] | |
self.agent_hub = agent | |
elif isinstance(agent, list): | |
self.agent_list = agent | |
self.agent_hub = None | |
else: | |
self.agent_list = [agent] | |
self.agent_hub = None | |
user_name = chatbot_config.get('user.name', 'user') | |
self.user_config = { | |
'name': user_name, | |
'avatar': chatbot_config.get( | |
'user.avatar', | |
get_avatar_image(user_name), | |
), | |
} | |
self.agent_config_list = [{ | |
'name': agent.name, | |
'avatar': chatbot_config.get( | |
'agent.avatar', | |
os.path.join(os.path.dirname(__file__), 'assets/logo.jpeg'), | |
), | |
'description': agent.description or "I'm a helpful assistant.", | |
} for agent in self.agent_list] | |
self.input_placeholder = chatbot_config.get('input.placeholder', '跟我聊聊吧~') | |
self.prompt_suggestions = chatbot_config.get('prompt.suggestions', []) | |
self.verbose = chatbot_config.get('verbose', False) | |
""" | |
Run the chatbot. | |
Args: | |
messages: The chat history. | |
""" | |
def run(self, | |
messages: List[Message] = None, | |
share: bool = False, | |
server_name: str = None, | |
server_port: int = None, | |
concurrency_limit: int = 10, | |
enable_mention: bool = False, | |
**kwargs): | |
self.run_kwargs = kwargs | |
from qwen_agent.gui.gradio import gr, mgr | |
customTheme = gr.themes.Default( | |
primary_hue=gr.themes.utils.colors.blue, | |
radius_size=gr.themes.utils.sizes.radius_none, | |
) | |
with gr.Blocks( | |
css=os.path.join(os.path.dirname(__file__), 'assets/appBot.css'), | |
theme=customTheme, | |
) as demo: | |
history = gr.State([]) | |
with gr.Row(elem_classes='container'): | |
with gr.Column(scale=4): | |
chatbot = mgr.Chatbot(value=convert_history_to_chatbot(messages=messages), | |
avatar_images=[ | |
self.user_config, | |
self.agent_config_list, | |
], | |
height=650, | |
avatar_image_width=80, | |
flushing=False, | |
show_copy_button=True, | |
latex_delimiters=[{ | |
'left': '\\(', | |
'right': '\\)', | |
'display': True | |
}, { | |
'left': '\\begin{equation}', | |
'right': '\\end{equation}', | |
'display': True | |
}, { | |
'left': '\\begin{align}', | |
'right': '\\end{align}', | |
'display': True | |
}, { | |
'left': '\\begin{alignat}', | |
'right': '\\end{alignat}', | |
'display': True | |
}, { | |
'left': '\\begin{gather}', | |
'right': '\\end{gather}', | |
'display': True | |
}, { | |
'left': '\\begin{CD}', | |
'right': '\\end{CD}', | |
'display': True | |
}, { | |
'left': '\\[', | |
'right': '\\]', | |
'display': True | |
}]) | |
input = mgr.MultimodalInput(placeholder=self.input_placeholder, upload_button_props=dict(file_types=[".pdf", ".docx", ".pptx", ".txt", ".html", ".csv", ".tsv", ".xlsx", ".xls"] + ["." + file_type for file_type in common_programming_language_extensions])) | |
with gr.Column(scale=1): | |
if len(self.agent_list) > 1: | |
agent_selector = gr.Dropdown( | |
[(agent.name, i) for i, agent in enumerate(self.agent_list)], | |
label='Agents', | |
info='选择一个Agent', | |
value=0, | |
interactive=True, | |
) | |
agent_info_block = self._create_agent_info_block() | |
# agent_plugins_block = self._create_agent_plugins_block() | |
if self.prompt_suggestions: | |
gr.Examples( | |
label='推荐对话', | |
examples=self.prompt_suggestions, | |
inputs=[input], | |
) | |
if len(self.agent_list) > 1: | |
agent_selector.change( | |
fn=self.change_agent, | |
inputs=[agent_selector], | |
outputs=[agent_selector, agent_info_block, agent_plugins_block], | |
queue=False, | |
) | |
input_promise = input.submit( | |
fn=self.add_text, | |
inputs=[input, chatbot, history], | |
outputs=[input, chatbot, history], | |
queue=False, | |
) | |
if len(self.agent_list) > 1 and enable_mention: | |
input_promise = input_promise.then( | |
self.add_mention, | |
[chatbot, agent_selector], | |
[chatbot, agent_selector], | |
).then( | |
self.agent_run, | |
[chatbot, history, agent_selector], | |
[chatbot, history, agent_selector], | |
) | |
else: | |
input_promise = input_promise.then( | |
self.agent_run, | |
[chatbot, history], | |
[chatbot, history], | |
) | |
input_promise.then(self.flushed, None, [input]) | |
demo.load(None) | |
demo.queue(default_concurrency_limit=concurrency_limit).launch(share=share, | |
server_name=server_name, | |
server_port=server_port) | |
def change_agent(self, agent_selector): | |
yield agent_selector, self._create_agent_info_block(agent_selector), self._create_agent_plugins_block( | |
agent_selector) | |
def add_text(self, _input, _chatbot, _history): | |
from qwen_agent.gui.gradio import gr | |
if _input.text == "/clear": | |
_chatbot = [] | |
_history.clear() | |
yield gr.update(interactive=False, value=""), _chatbot, _history | |
return | |
_history.append({ | |
ROLE: USER, | |
CONTENT: [{ | |
'text': _input.text | |
}], | |
}) | |
if self.user_config[NAME]: | |
_history[-1][NAME] = self.user_config[NAME] | |
if _input.files: | |
for file in _input.files: | |
if file.mime_type.startswith('image/'): | |
_history[-1][CONTENT].append({IMAGE: 'file://' + file.path}) | |
else: | |
_history[-1][CONTENT].append({FILE: file.path}) | |
_chatbot.append([_input, None]) | |
yield gr.update(interactive=False, value=None), _chatbot, _history | |
def add_mention(self, _chatbot, _agent_selector): | |
if len(self.agent_list) == 1: | |
yield _chatbot, _agent_selector | |
query = _chatbot[-1][0].text | |
match = re.search(r'@\w+\b', query) | |
if match: | |
_agent_selector = self._get_agent_index_by_name(match.group()[1:]) | |
agent_name = self.agent_list[_agent_selector].name | |
if ('@' + agent_name) not in query and self.agent_hub is None: | |
_chatbot[-1][0].text = '@' + agent_name + ' ' + query | |
yield _chatbot, _agent_selector | |
def agent_run(self, _chatbot, _history, _agent_selector=None): | |
if not _history: | |
if _agent_selector is not None: | |
yield _chatbot, _history, _agent_selector | |
else: | |
yield _chatbot, _history | |
return | |
if self.verbose: | |
logger.info('agent_run input:\n' + pprint.pformat(_history, indent=2)) | |
num_input_bubbles = len(_chatbot) - 1 | |
num_output_bubbles = 1 | |
_chatbot[-1][1] = [None for _ in range(len(self.agent_list))] | |
agent_runner = self.agent_list[_agent_selector or 0] | |
if self.agent_hub: | |
agent_runner = self.agent_hub | |
responses = [] | |
for responses in agent_runner.run(_history, **self.run_kwargs): | |
# usage = responses.usage | |
# responses = [Message(ASSISTANT, responses.output.choices[0].message.content)] | |
if not responses: | |
continue | |
if responses[-1][CONTENT] == PENDING_USER_INPUT: | |
logger.info('Interrupted. Waiting for user input!') | |
break | |
display_responses = convert_fncall_to_text(responses) | |
# display_responses[-1][CONTENT] += "\n<summary>" + repr({"usage": usage}) + "</summary>" | |
if not display_responses: | |
continue | |
if display_responses[-1][CONTENT] is None: | |
continue | |
while len(display_responses) > num_output_bubbles: | |
# Create a new chat bubble | |
_chatbot.append([None, None]) | |
_chatbot[-1][1] = [None for _ in range(len(self.agent_list))] | |
num_output_bubbles += 1 | |
assert num_output_bubbles == len(display_responses) | |
assert num_input_bubbles + num_output_bubbles == len(_chatbot) | |
for i, rsp in enumerate(display_responses): | |
agent_index = self._get_agent_index_by_name(rsp[NAME]) | |
_chatbot[num_input_bubbles + i][1][agent_index] = rsp[CONTENT] | |
if len(self.agent_list) > 1: | |
_agent_selector = agent_index | |
if _agent_selector is not None: | |
yield _chatbot, _history, _agent_selector | |
else: | |
yield _chatbot, _history | |
if responses: | |
for res in responses: | |
res['content'] = re.sub(r"\n<summary>input tokens.*</summary>", "", res['content']) | |
_history.extend([res for res in responses if res[CONTENT] != PENDING_USER_INPUT]) | |
if _agent_selector is not None: | |
yield _chatbot, _history, _agent_selector | |
else: | |
yield _chatbot, _history | |
if self.verbose: | |
logger.info('agent_run response:\n' + pprint.pformat(responses, indent=2)) | |
def flushed(self): | |
from qwen_agent.gui.gradio import gr | |
return gr.update(interactive=True) | |
def _get_agent_index_by_name(self, agent_name): | |
if agent_name is None: | |
return 0 | |
try: | |
agent_name = agent_name.strip() | |
for i, agent in enumerate(self.agent_list): | |
if agent.name == agent_name: | |
return i | |
return 0 | |
except Exception: | |
print_traceback() | |
return 0 | |
def _create_agent_info_block(self, agent_index=0): | |
from qwen_agent.gui.gradio import gr | |
agent_config_interactive = self.agent_config_list[agent_index] | |
return gr.HTML( | |
format_cover_html( | |
bot_name=agent_config_interactive['name'], | |
bot_description=agent_config_interactive['description'], | |
bot_avatar=agent_config_interactive['avatar'], | |
)) | |
def _create_agent_plugins_block(self, agent_index=0): | |
from qwen_agent.gui.gradio import gr | |
agent_interactive = self.agent_list[agent_index] | |
if agent_interactive.function_map: | |
capabilities = [key for key in agent_interactive.function_map.keys()] | |
return gr.CheckboxGroup( | |
label='插件', | |
value=capabilities, | |
choices=capabilities, | |
interactive=False, | |
) | |
else: | |
return gr.CheckboxGroup( | |
label='插件', | |
value=[], | |
choices=[], | |
interactive=False, | |
) | |