hcare / app.py
orrinin's picture
Update app.py
ca12f97 verified
import os
import base64
import gradio as gr
from gradio_client import Client, file
import time
import json
import threading
MODEL_NAME = "Qwen"
client_chat = os.environ.get("CHAT_URL")
client_vl = os.environ.get("VL_URL")
def read(filename):
with open(filename) as f:
data = f.read()
return data
SYS_PROMPT = read('system_prompt.txt')
DESCRIPTION = '''
<div>
<h1 style="text-align: center;">家庭医生demo</h1>
<p>🩺一个帮助您分析症状和检验报告的家庭医生(AI诊疗助手)。</p>
<p>🔎 选择您需要咨询的科室医生,在输入框中输入症状描述或者体检信息等;您也可以在图片框中上传检测报告图。</p>
<p>🦕 请注意生成信息可能不准确,且不具备任何实际参考价值,如有需要请联系专业医生。</p>
<p>💾 因网络延迟问题,图片识别未及时出现分析内容,请间隔多次点击重载,手动尝试获取。</p>
</div>
'''
css = """
h1 {
text-align: center;
display: block;
}
footer {
display:none !important
}
"""
LICENSE = '当前采用 ' + MODEL_NAME + ' 模型,请主动移除个人信息,注意隐私保护🔔'
local_data = threading.local()
def init_data():
if not hasattr(local_data, 'results'):
local_data.results = ""
def process_text(text_input, unit):
init_data()
client = Client(client_chat)
# print(client.view_api())
job = client.submit(
query=str(text_input),
history=None,
system=f"You are a experienced {unit} doctor AI assistant." + SYS_PROMPT,
api_name="/model_chat"
)
response = job.result()
local_data.results = response[1][0][1]
return local_data.results
'''
def update(json_path: str, event: threading.Event):
init_data()
event.wait()
client = Client(client_vl)
response = client.predict(
json_path,
fn_index=1
)
with open(response, 'r') as f:
data = json.load(f)
local_data.results = data[-1][1]
print(local_data.results)
'''
def process_image(image_input, unit):
init_data()
if image_input is not None:
image = str(image_input)
print(image)
#with open(image_input, "rb") as f:
# base64_image = base64.b64encode(f.read()).decode("utf-8")
client = Client(client_vl)
# print(client.view_api())
prompt = f" You are a experienced {unit} doctor AI assistant." + SYS_PROMPT + "Help me understand what is in this picture and analysis."
res5 = client.predict(
"",
image,
fn_index=5
)
print(res5)
res0 = client.predict(
res5,
prompt,
fn_index=0
)
print(res0)
'''
event = threading.Event()
t = threading.Thread(target=update, args=(res0, event))
t.start()
local_data.results = "正在分析....."
event.set()
'''
job = client.submit(
res0,
fn_index=1
)
response = job.result()
with open(response, 'r') as f:
data = json.load(f)
local_data.results = data[-1][1]
return local_data.results
def fetch_result():
init_data()
return local_data.results
def reset_result():
init_data()
print(local_data.results)
local_data.results = ""
def main(text_input="", image_input=None, unit=""):
reset_result()
if text_input and image_input is None:
return process_text(text_input, unit)
elif image_input is not None:
return process_image(image_input, unit)
with gr.Blocks(css=css, title="家庭医生AI助手", theme="soft") as iface:
with gr.Accordion(""):
gr.Markdown(DESCRIPTION)
unit = gr.Dropdown(label="🩺科室", value='中医科', elem_id="units",
choices=["中医科", "内科", "外科", "妇产科", "儿科", \
"五官科", "男科", "皮肤性病科", "传染科", "精神心理科", \
"整形美容科", "营养科", "生殖中心", "麻醉医学科", "医学影像科", \
"骨科", "肿瘤科", "急诊科", "检验科"])
with gr.Row():
output_box = gr.Textbox(label="分析")
with gr.Row():
image_input = gr.Image(type="filepath", label="上传图片") # Create an image upload button
text_input = gr.Textbox(label="输入") # Create a text input box
with gr.Row():
submit_btn = gr.Button("🚀 发送") # Create a submit button
fresh_btn = gr.Button("✨ 重载")
clear_btn = gr.ClearButton([output_box, image_input, text_input], value="🗑️ 清空") # Create a clear button
# Set up the event listeners
submit_btn.click(main, inputs=[text_input, image_input, unit], outputs=output_box)
fresh_btn.click(fn=fetch_result, outputs=output_box)
gr.Markdown(LICENSE)
#gr.close_all()
iface.queue().launch(show_api=False) # Launch the Gradio interface