AI-book / app.py
Omnibus's picture
Update app.py
8e8a987 verified
raw
history blame
23 kB
from huggingface_hub import InferenceClient, HfApi, upload_file
import datetime
import gradio as gr
import requests
import random
import prompts
import json
import uuid
import os
token=os.environ.get("HF_TOKEN")
username="omnibus"
dataset_name="tmp"
save_data=f'https://huggingface.co/datasets/{username}/{dataset_name}/raw/main/'
api=HfApi(token="")
VERBOSE=False
history = []
hist_out= []
summary =[]
main_point=[]
summary.append("")
main_point.append("")
list_of_users=["user1","user2","user3"]
persona=[
{"name":"Mr. Nice Guy", "description":"Nice","personality":"friendly, caring, helpful and informative. You always compliment people, and stick up for them, and you have no patience for bullies."},
{"name":"Mr. Mean Guy", "description":"Mean","personality":"a total asshole. You think you are really smart, but really you are just ignorant and mean. You don't have time for everybodies stupidity, and you let them know that in the comments."},
{"name":"Smarty Pants", "description":"Genius","personality":"intelligent, informative, know-it-all. You are the smartest guy in the room and always one-up the blog poster to show how mart you are."},
{"name":"Try Hard", "description":"Not Genius","personality":"dimwitted, lacking understanding about any topic. You always ask really irrelevant questions about the post."},
{"name":"Class Clown", "description":"Humerous","personality":"humerous, funny. You turn everything into a joke. Make a joke about the post."},
]
persona_names=[]
for ea in persona:
persona_names.append(ea['name'])
models=[
"mistralai/Mixtral-8x7B-Instruct-v0.1",
"mistralai/Mixtral-8x7B-Instruct-v0.2",
"google/gemma-7b",
"google/gemma-7b-it",
"google/gemma-2b",
"google/gemma-2b-it",
"meta-llama/Llama-2-7b-chat-hf",
"codellama/CodeLlama-70b-Instruct-hf",
"openchat/openchat-3.5-0106",
"NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO",
]
client_z=[]
def load_models(inp):
if VERBOSE==True:
print(type(inp))
print(inp)
print(models[inp])
client_z.clear()
client_z.append(InferenceClient(models[inp]))
#if "mistralai" in models[inp]:
# tokens = gr.Slider(label="Max new tokens",value=1600,minimum=0,maximum=8000,step=64,interactive=True, visible=True,info="The maximum number of tokens")
return gr.update(label=models[inp])
def format_prompt(message, history):
prompt = "<s>"
for user_prompt, bot_response in history:
prompt += f"[INST] {user_prompt} [/INST]"
prompt += f" {bot_response}</s> "
prompt += f"[INST] {message} [/INST]"
return prompt
agents =[
"COMMENTER",
"BLOG_POSTER",
"REPLY_TO_COMMENTER",
"COMPRESS_HISTORY_PROMPT"
]
temperature=0.9
max_new_tokens=256
max_new_tokens2=4000
top_p=0.95
repetition_penalty=1.0,
def compress_history(formatted_prompt):
print("###############\nRUNNING COMPRESS HISTORY\n###############\n")
seed = random.randint(1,1111111111111111)
agent=prompts.COMPRESS_HISTORY_PROMPT.format(history=summary[0],focus=main_point[0])
system_prompt=agent
temperature = 0.9
if temperature < 1e-2:
temperature = 1e-2
generate_kwargs = dict(
temperature=temperature,
max_new_tokens=1048,
top_p=0.95,
repetition_penalty=1.0,
do_sample=True,
seed=seed,
)
#history.append((prompt,""))
#formatted_prompt = format_prompt(f"{system_prompt}, {prompt}", history)
formatted_prompt = formatted_prompt
client=client_z[0]
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False)
output = ""
for response in stream:
output += response.token.text
#history.append((output,history))
print(output)
print(main_point[0])
return output
def comment_generate(prompt, history,post_check,full_conv,persona2, agent_name=agents[0], sys_prompt="", temperature=0.9, max_new_tokens=1028, top_p=0.95, repetition_penalty=1.3,):
current_time = str(datetime.datetime.now())
uid=uuid.uuid4()
print(post_check)
print("###############\nRUNNING QUESTION GENERATOR\n###############\n")
seed = random.randint(1,1111111111111111)
agent=prompts.COMMENTER.format(focus=post_check['output'],persona=persona[persona2]['personality'])
system_prompt=agent
temperature = float(temperature)
if temperature < 1e-2:
temperature = 1e-2
top_p = float(top_p)
generate_kwargs = dict(
temperature=temperature,
max_new_tokens=max_new_tokens,
top_p=top_p,
repetition_penalty=repetition_penalty,
do_sample=True,
seed=seed,
)
#history.append((prompt,""))
#formatted_prompt = format_prompt(f"{system_prompt}, {prompt}", history)
#[INST] {message} [/INST]
formatted_prompt = f"[INST] {system_prompt}, {prompt} [/INST]"
client=client_z[0]
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False)
output = ""
for response in stream:
output += response.token.text
history.append((output,None))
#comment_cnt=post_check['comment']
#print(type(comment_cnt))
#post_check['comment']=comment_cnt+1
reply_json= {'user':'','datetime':'','reply':''}
comment_json= {'user':persona[persona2]['name'],'datetime':current_time,'comment':output,'reply_list':[]}
comment_out=post_check['comment_list']
print(comment_out)
comment_out.append(comment_json)
out_json = {'user':post_check['user'],'datetime':post_check['datetime'],'file_name':post_check['file_name'],
'title':post_check['title'],'blog':1,'comment':post_check['comment']+1,'reply':post_check['reply'],
"prompt":post_check['prompt'],"output":post_check['output'],'comment_list':comment_out}
#out_json = {'user':"",'datetime':current_time,'title':title,'blog':1,'comment':0,'reply':0,"prompt":prompt,"output":output}
#full_conv[-1]+=(output,)
#full_conv.append((None,output,None))
html_out=load_html(out_json)
#out_json = {'user':list_of_users[0],'datetime':current_time,'file_name':filename,'title':title,'blog':1,'comment':0,'reply':0,"prompt":prompt,"output":output,'comment_list':[]}
file_n = f'{post_check["file_name"]}.json'
print(file_n)
'''
r = requests.get(f'{save_data}book1/{file_n}')
print(f'status code main:: {r.status_code}')
if r.status_code==200:
try:
lod = json.loads(r.text)
print(f'lod:: {lod}')
lod[0]['comment']=lod[0]['comment']+1
lod[0]['comment_list'].append({'user':persona[persona2]['name'],'datetime':'','comment':output,'reply_list':[]})
#hist_out.append(out_json)
#try:
# for ea in
'''
with open(f'{uid}.json', 'w') as f:
json_hist=json.dumps(out_json, indent=4)
f.write(json_hist)
f.close()
upload_file(
path_or_fileobj =f"{uid}.json",
path_in_repo = f"book1/{file_n}",
repo_id =f"{username}/{dataset_name}",
repo_type = "dataset",
token=token,
)
#except Exception as e:
# print(e)
return "",history,out_json,out_json,out_json,html_out
def reply_generate(prompt, history,post_check,full_conv,persona1, agent_name=agents[0], sys_prompt="", temperature=0.9, max_new_tokens=1028, top_p=0.95, repetition_penalty=1.0,):
#def question_generate(prompt, history):
current_time = str(datetime.datetime.now())
uid=uuid.uuid4()
print(post_check)
#full_conv=history
print(f'full_conv::\n{full_conv}')
print("###############\nRUNNING REPLY GENERATOR\n###############\n")
seed = random.randint(1,1111111111111111)
agent=prompts.REPLY_TO_COMMENTER.format(focus=full_conv[0][0],comment=full_conv[1][1],persona=persona[persona1]['personality'])
system_prompt=agent
temperature = float(temperature)
if temperature < 1e-2:
temperature = 1e-2
top_p = float(top_p)
generate_kwargs = dict(
temperature=temperature,
max_new_tokens=max_new_tokens,
top_p=top_p,
repetition_penalty=repetition_penalty,
do_sample=True,
seed=seed,
)
#history.append((prompt,""))
#formatted_prompt = format_prompt(f"{system_prompt}, {prompt}", history)
formatted_prompt = f"[INST] {system_prompt}, {prompt} [/INST]"
client=client_z[0]
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False)
output = ""
for response in stream:
output += response.token.text
history.append((output,None))
#reply_cnt=post_check['reply']
#post_check['reply']=reply_cnt+1
reply_json= {'user':persona[persona1]['name'],'datetime':current_time,'reply':output}
#comment_json= {'user':persona[persona2]['name'],'datetime':current_time,'comment':output,'reply_list':[reply_json]}
out_json = {'user':post_check['user'],'datetime':post_check['datetime'],'file_name':post_check['file_name'],
'title':post_check['title'],'blog':1,'comment':post_check['comment'],'reply':post_check['reply']+1,
"prompt":post_check['prompt'],"output":post_check['output'],'comment_list':post_check['comment_list']['reply_list'].append(reply_json)}
#out_json = {'user':"",'datetime':current_time,'title':title,'blog':1,'comment':0,'reply':0,"prompt":prompt,"output":output}
#full_conv[-1]+=(output,)
#full_conv.append((None,output,None))
html_out=load_html(out_json)
#out_json = {'user':list_of_users[0],'datetime':current_time,'file_name':filename,'title':title,'blog':1,'comment':0,'reply':0,"prompt":prompt,"output":output,'comment_list':[]}
file_n = f'{post_check["file_name"]}.json'
print(file_n)
with open(f'{uid}.json', 'w') as f:
json_hist=json.dumps(out_json, indent=4)
f.write(json_hist)
f.close()
upload_file(
path_or_fileobj =f"{uid}.json",
path_in_repo = f"book1/{file_n}",
repo_id =f"{username}/{dataset_name}",
repo_type = "dataset",
token=token,
)
#except Exception as e:
# print(e)
return "",history,out_json,out_json,out_json,html_out
'''
hist_out.append(out_json)
#try:
# for ea in
with open(f'{uid}.json', 'w') as f:
json_hist=json.dumps(hist_out, indent=4)
f.write(json_hist)
f.close()
upload_file(
path_or_fileobj =f"{uid}.json",
path_in_repo = f"book1/{filename}.json",
repo_id =f"{username}/{dataset_name}",
repo_type = "dataset",
token=token,
)
#out_json = {'user':"",'datetime':current_time,'title':title,'blog':1,'comment':0,'reply':0,"prompt":prompt,"output":output}
#full_conv[-1]+=(output,)
#full_conv.append((None,None,output))
html_out=load_html(post_check)
file_n = f'{post_check["filename"]}.json'
print(file_n)
r = requests.get(f'{save_data}book1/{file_n}')
print(f'status code main:: {r.status_code}')
if r.status_code==200:
try:
lod = json.loads(r.text)
print(f'lod:: {lod}')
lod[0]['reply']=lod[0]['reply']+1
lod[0]['comment_list'][0]['reply_list'].append({'user':persona[persona1]['name'],'datetime':'','reply':output})
#hist_out.append(out_json)
#try:
# for ea in
with open(f'{uid}.json', 'w') as f:
json_hist=json.dumps(lod, indent=4)
f.write(json_hist)
f.close()
upload_file(
path_or_fileobj =f"{uid}.json",
path_in_repo = f"book1/{file_n}",
repo_id =f"{username}/{dataset_name}",
repo_type = "dataset",
token=token,
)
except Exception as e:
print(e)
return "",history,lod[0],lod[0],lod[0],html_out
'''
def create_valid_filename(invalid_filename: str) -> str:
"""Converts invalid characters in a string to be suitable for a filename."""
invalid_filename.replace(" ","-")
valid_chars = '-'.join(invalid_filename.split())
allowed_chars = ('a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M',
'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '_', '-')
return ''.join(char for char in valid_chars if char in allowed_chars)
def load_html(conv):
ht=""
comm=0
#for i,ea in enumerate(inp):
#blog,comm,repl=ea
#print(f'outp:: {outp}')
#print(f'prom:: {prom}')
ht+=f"""<div class="div_box">"""
ht+=f"""<pre class="bpost"><div class="bhead"><h2>{conv['title']}</h2><br><h5>{conv['user']}</h5></div>{conv['output']}</pre>"""
if conv['comment_list']:
for com in conv['comment_list']:
ht+=f"""<pre class="resp1"><div class="bhead"></div>{com}</pre>"""
for ea in conv['comment_list']:
if conv['comment_list'][ea]['reply_list']:
for repl in conv['comment_list'][ea]['reply_list']:
ht+=f"""<pre class="resp2"><div class="bhead"></div>{repl}</pre>"""
ht+=f"""</div>"""
'''
if inp:
for i,ea in enumerate(inp):
blog,comm,repl=ea
#print(f'outp:: {outp}')
#print(f'prom:: {prom}')
ht+=f"""<div class="div_box">"""
if blog:
#ht+=f"""<div class="bhead"><div><h1>$btitle</h1></div><div>$user_name</div></div>"""
ht+=f"""<pre class="bpost"><div class="bhead"><h2>{conv['title']}</h2><br><h5>{conv['user']}</h5></div>{blog}</pre>"""
if comm:
ht+=f"""<pre class="resp1"><div class="bhead"></div>{comm}</pre>"""
if repl:
ht+=f"""<pre class="resp2"><div class="bhead"></div>{repl}</pre>"""
ht+=f"""</div>"""
'''
with open('index.html','r') as h:
html=h.read()
html = html.replace("$body",f"{ht}")
#html = html.replace("$title",f"{title}")
#html = html.replace("$title",f"{title}")
#html = html.replace("$user_name",f"{user_name}")
h.close()
return html
def load_html_OG(inp,title):
ht=""
if inp:
for i,ea in enumerate(inp):
outp,prom=ea
#print(f'outp:: {outp}')
#print(f'prom:: {prom}')
if i == 0:
ht+=f"""<div class="div_box">
<pre class="bpost">{outp}</pre>
<pre class="resp1">{prom}</pre>
</div>"""
else:
ht+=f"""<div class="div_box">
<pre class="resp2">{outp}</pre>
<pre class="resp2">{prom}</pre>
</div>"""
with open('index.html','r') as h:
html=h.read()
html = html.replace("$body",f"{ht}")
html = html.replace("$title",f"{title}")
h.close()
return html
def generate(prompt, history, post_check,full_conv,persona1, agent_name=agents[0], sys_prompt="", temperature=0.9, max_new_tokens=1048, top_p=0.95, repetition_penalty=1.0):
print(persona1)
html_out=""
#main_point[0]=prompt
#print(datetime.datetime.now())
uid=uuid.uuid4()
current_time = str(datetime.datetime.now())
title=""
filename=create_valid_filename(f'{current_time}---{title}')
current_time=current_time.replace(":","-")
current_time=current_time.replace(".","-")
print (current_time)
agent=prompts.BLOG_POSTER.format(persona=persona[persona1]['personality'])
system_prompt=agent
temperature = float(temperature)
if temperature < 1e-2:
temperature = 1e-2
top_p = float(top_p)
hist_out=[]
sum_out=[]
json_hist={}
json_obj={}
#full_conv=[]
post_cnt=1
if not post_check:
post_check={}
#if not full_conv:
# full_conv=[]
seed = random.randint(1,1111111111111111)
if not post_check:
print("writing blog")
generate_kwargs = dict(
temperature=temperature,
max_new_tokens=max_new_tokens2,
top_p=top_p,
repetition_penalty=repetition_penalty,
do_sample=True,
seed=seed,
)
if prompt.startswith(' \"'):
prompt=prompt.strip(' \"')
formatted_prompt = format_prompt(f"{system_prompt}, {prompt}", history)
if len(formatted_prompt) < (40000):
print(len(formatted_prompt))
client=client_z[0]
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False)
output = ""
#if history:
# yield history
if not prompt:
prompt_out = None
else:
prompt_out=prompt
for response in stream:
output += response.token.text
yield "", [(prompt_out,output)],post_check,post_check,summary[0],json_obj, json_hist,html_out
if not title:
for line in output.split("\n"):
if "title" in line.lower() and ":" in line.lower():
title = line.split(":")[1]
if title.startswith(' \"'):
title=title.strip(' \"')
print(f'title:: {title}')
filename=create_valid_filename(f'{current_time}---{title}')
#out_json = {'user':persona[persona1]['name'],'datetime':current_time,'file_name':filename,'title':title,'blog':1,'comment':0,'reply':0,"prompt":prompt,"output":output,'comment_list':[]}
reply_json= {'user':'','datetime':'','reply':''}
comment_json= {'user':'','datetime':'','comment':'','reply_list':[reply_json]}
out_json = {'user':persona[persona1]['name'],'datetime':current_time,'file_name':filename,
'title':title,'blog':1,'comment':0,'reply':0,
"prompt":prompt,"output":output,'comment_list':[]}
#hist_out.append(out_json)
#try:
# for ea in
with open(f'{uid}.json', 'w') as f:
json_hist=json.dumps(out_json, indent=4)
f.write(json_hist)
f.close()
upload_file(
path_or_fileobj =f"{uid}.json",
path_in_repo = f"book1/{filename}.json",
repo_id =f"{username}/{dataset_name}",
repo_type = "dataset",
token=token,
)
else:
formatted_prompt = format_prompt(f"{prompts.COMPRESS_HISTORY_PROMPT.format(history=summary[0],focus=main_point[0])}, {summary[0]}", history)
#current_time = str(datetime.datetime.now().timestamp()).split(".",1)[0]
#filename=f'{filename}-{current_time}'
history = []
output = compress_history(formatted_prompt)
summary[0]=output
sum_json = {"summary":summary[0]}
sum_out.append(sum_json)
with open(f'{uid}-sum.json', 'w') as f:
json_obj=json.dumps(sum_out, indent=4)
f.write(json_obj)
f.close()
upload_file(
path_or_fileobj =f"{uid}-sum.json",
path_in_repo = f"book1/{filename}-summary.json",
repo_id =f"{username}/{dataset_name}",
repo_type = "dataset",
token=token,
)
#prompt = question_generate(output, history)
#main_point[0]=output
#full_conv.append((output,None,None))
html_out=load_html(out_json)
#post_check={'filename':filename,'user':persona[persona1]['name'],'datetime':current_time,'title':title,'blog':1,'comment':0,'reply':0}
yield prompt, history,out_json,out_json,summary[0],out_json,json_hist,html_out
else:
print("passing blog")
with gr.Blocks() as app:
chat_handler=gr.State()
post_handler=gr.State()
html = gr.HTML()
chatbot=gr.Chatbot(visible=False)
msg = gr.Textbox()
with gr.Row():
submit_b = gr.Button("Blog Post")
submit_c = gr.Button("Comment")
submit_r = gr.Button("OP Reply")
with gr.Row():
stop_b = gr.Button("Stop")
clear = gr.ClearButton([msg, chatbot])
with gr.Row():
m_choice=gr.Dropdown(label="Models",type='index',choices=[c for c in models],value=models[0],interactive=True)
tokens = gr.Slider(label="Max new tokens",value=1600,minimum=0,maximum=8000,step=64,interactive=True, visible=True,info="The maximum number of tokens")
with gr.Row():
#persona1=gr.Textbox(label="Bot 1 Persona",info="Your personallity can be be described as...")
#persona2=gr.Textbox(label="Bot 2 Persona",info="Your personallity can be be described as...")
persona1=gr.Dropdown(label="Bot 1 Persona",value=persona_names[0],type='index',choices=[p for p in persona_names])
persona2=gr.Dropdown(label="Bot 2 Persona",value=persona_names[3],type='index',choices=[p for p in persona_names])
sumbox=gr.Textbox("Summary", max_lines=100)
with gr.Column():
sum_out_box=gr.JSON(label="Summaries")
hist_out_box=gr.JSON(label="History")
m_choice.change(load_models,m_choice,[chatbot])
#app.load(load_models,m_choice,[chatbot]).then(load_html,None,html)
app.load(load_models,m_choice,[chatbot])
sub_b = submit_b.click(generate, [msg,chatbot,post_handler,chat_handler,persona1,tokens],[msg,chatbot,post_handler,chat_handler,sumbox,sum_out_box,hist_out_box,html])
sub_c = submit_c.click(comment_generate, [msg,chatbot,post_handler,chat_handler,persona2],[msg,chatbot,sumbox,sum_out_box,hist_out_box,html])
sub_r = submit_r.click(reply_generate, [msg,chatbot,post_handler,chat_handler,persona1],[msg,chatbot,sumbox,sum_out_box,hist_out_box,html])
sub_e = msg.submit(generate, [msg,chatbot,post_handler,chat_handler,chat_handler,persona1,tokens],[msg,chatbot,post_handler,chat_handler,sumbox,sum_out_box,hist_out_box,html])
stop_b.click(None,None,None, cancels=[sub_b,sub_e,sub_c,sub_r])
app.queue(default_concurrency_limit=20).launch()