CogVLM-CogAgent / app.py
lykeven's picture
add grounding
6accf0d
raw
history blame
8.17 kB
#!/usr/bin/env python
import gradio as gr
import os
import re
from PIL import Image
import base64
import time
DESCRIPTION = '''# <a href="https://github.com/THUDM/CogVLM">VisualGLM</a>'''
MAINTENANCE_NOTICE1 = 'Hint 1: If the app report "Something went wrong, connection error out", please turn off your proxy and retry.<br>Hint 2: If you upload a large size of image like 10MB, it may take some time to upload and process. Please be patient and wait.'
GROUNDING_NOTICE = 'Hint: When you check "Grounding", please use the <a href="https://github.com/THUDM/CogVLM/blob/main/utils/template.py#L344">corresponding prompt</a> or the examples below.'
NOTES = 'This app is adapted from <a href="https://github.com/THUDM/CogVLM">https://github.com/THUDM/CogVLM</a>. It would be recommended to check out the repo if you want to see the detail of our model.'
import json
import requests
import base64
import hashlib
from utils import parse_response
default_chatbox = [("", "Hi, What do you want to know about this image?")]
URL = os.environ.get("URL")
def process_image(image_prompt):
image = Image.open(image_prompt)
print(f"height:{image.height}, width:{image.width}")
resized_image = image.resize((224, 224), )
timestamp = int(time.time())
file_ext = os.path.splitext(image_prompt)[1]
filename = f"examples/{timestamp}{file_ext}"
resized_image.save(filename)
print(f"temporal filename {filename}")
with open(filename, "rb") as image_file:
bytes = base64.b64encode(image_file.read())
encoded_img = str(bytes, encoding='utf-8')
image_hash = hashlib.sha256(bytes).hexdigest()
os.remove(filename)
return encoded_img, image_hash
def process_image_without_resize(image_prompt):
image = Image.open(image_prompt)
print(f"height:{image.height}, width:{image.width}")
timestamp = int(time.time())
file_ext = os.path.splitext(image_prompt)[1]
filename = f"examples/{timestamp}{file_ext}"
filename_grounding = f"examples/{timestamp}_grounding{file_ext}"
image.save(filename)
print(f"temporal filename {filename}")
with open(filename, "rb") as image_file:
bytes = base64.b64encode(image_file.read())
encoded_img = str(bytes, encoding='utf-8')
image_hash = hashlib.sha256(bytes).hexdigest()
os.remove(filename)
return image, encoded_img, image_hash, filename_grounding
def is_chinese(text):
zh_pattern = re.compile(u'[\u4e00-\u9fa5]+')
return zh_pattern.search(text)
def post(
input_text,
temperature,
top_p,
image_prompt,
result_previous,
hidden_image,
grounding
):
result_text = [(ele[0], ele[1]) for ele in result_previous]
for i in range(len(result_text)-1, -1, -1):
if result_text[i][0] == "" or result_text[i][0] == None:
del result_text[i]
print(f"history {result_text}")
is_zh = is_chinese(input_text)
if image_prompt is None:
print("Image empty")
if is_zh:
result_text.append((input_text, '图片为空!请上传图片并重试。'))
else:
result_text.append((input_text, 'Image empty! Please upload a image and retry.'))
return input_text, result_text, hidden_image
elif input_text == "":
print("Text empty")
result_text.append((input_text, 'Text empty! Please enter text and retry.'))
return "", result_text, hidden_image
headers = {
"Content-Type": "application/json; charset=UTF-8",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36",
}
if image_prompt:
pil_img, encoded_img, image_hash, image_path_grounding = process_image_without_resize(image_prompt)
print(f"image_hash:{image_hash}, hidden_image_hash:{hidden_image}")
if hidden_image is not None and image_hash != hidden_image:
print("image has been update")
result_text = []
hidden_image = image_hash
else:
encoded_img = None
print('request chat model...' if not grounding else 'request grounding model...')
data = json.dumps({
'text': input_text,
'image': encoded_img,
'temperature': temperature,
'top_p': top_p,
'history': result_text,
'is_grounding': grounding
})
try:
response = requests.request("POST", URL, headers=headers, data=data, timeout=(60, 100)).json()
except Exception as e:
print("error message", e)
if is_zh:
result_text.append((input_text, '超时!请稍等几分钟再重试。'))
else:
result_text.append((input_text, 'Timeout! Please wait a few minutes and retry.'))
return "", result_text, hidden_image
print('request done...')
# response = {'result':input_text}
answer = str(response['result'])
if grounding:
parse_response(pil_img, answer, image_path_grounding)
new_answer = answer.replace(input_text, "")
result_text.append((input_text, new_answer))
result_text.append((None, (image_path_grounding,)))
else:
result_text.append((input_text, answer))
print(result_text)
print('finished')
return "", result_text, hidden_image
def clear_fn(value):
return "", default_chatbox, None
def clear_fn2(value):
return default_chatbox
def main():
gr.close_all()
examples = []
with open("./examples/example_inputs.jsonl") as f:
for line in f:
data = json.loads(line)
examples.append(data)
with gr.Blocks(css='style.css') as demo:
with gr.Row():
with gr.Column(scale=4.5):
with gr.Group():
input_text = gr.Textbox(label='Input Text', placeholder='Please enter text prompt below and press ENTER.')
with gr.Row():
run_button = gr.Button('Generate')
clear_button = gr.Button('Clear')
image_prompt = gr.Image(type="filepath", label="Image Prompt", value=None)
with gr.Row():
grounding = gr.Checkbox(label="Grounding")
with gr.Row():
grounding_notice = gr.Markdown(GROUNDING_NOTICE)
with gr.Row():
temperature = gr.Slider(maximum=1, value=0.8, minimum=0, label='Temperature')
top_p = gr.Slider(maximum=1, value=0.4, minimum=0, label='Top P')
with gr.Column(scale=5.5):
result_text = gr.components.Chatbot(label='Multi-round conversation History', value=[("", "Hi, What do you want to know about this image?")]).style(height=550)
hidden_image_hash = gr.Textbox(visible=False)
gr_examples = gr.Examples(examples=[[example["text"], example["image"]] for example in examples],
inputs=[input_text, image_prompt],
label="Example Inputs (Click to insert an examplet into the input box)",
examples_per_page=6)
gr.Markdown(MAINTENANCE_NOTICE1)
gr.Markdown(NOTES)
print(gr.__version__)
run_button.click(fn=post,inputs=[input_text, temperature, top_p, image_prompt, result_text, hidden_image_hash, grounding],
outputs=[input_text, result_text, hidden_image_hash])
input_text.submit(fn=post,inputs=[input_text, temperature, top_p, image_prompt, result_text, hidden_image_hash, grounding],
outputs=[input_text, result_text, hidden_image_hash])
clear_button.click(fn=clear_fn, inputs=clear_button, outputs=[input_text, result_text, image_prompt])
image_prompt.upload(fn=clear_fn2, inputs=clear_button, outputs=[result_text])
image_prompt.clear(fn=clear_fn2, inputs=clear_button, outputs=[result_text])
print(gr.__version__)
demo.queue(concurrency_count=10)
demo.launch()
if __name__ == '__main__':
main()