import spaces from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer import torch import gradio as gr import logging from huggingface_hub import login import os import traceback from threading import Thread from random import shuffle, choice logging.basicConfig(level=logging.DEBUG) SPACER = '\n' + '*' * 40 + '\n' HF_TOKEN = os.environ.get("HF_TOKEN", None) login(token=HF_TOKEN) system_prompts = { "English": "You are a helpful chatbot that answers user input in a concise and witty way.", "German": "Du bist ein hilfreicher Chatbot, der Usereingaben knapp und originell beantwortet.", "French": "Tu es un chatbot utile qui répond aux questions des utilisateurs de manière concise et originale.", "Spanish": "Eres un chatbot servicial que responde a las entradas de los usuarios de forma concisa y original." } htmL_info = "

Pharia Battle Royale

Let the games begin: In this bot arena, the Pharia 1 model competes against a challenger. Try a prompt in a language you want to explore. Set the parameters and vote for the best answers. After casting your vote, the bots reveal their identity. Inputs, outputs and votes are logged anonymously for further insight.

" model_info = [{"id": "Aleph-Alpha/Pharia-1-LLM-7B-control-hf", "name": "Pharia 1 LLM 7B control hf"}] challenger_models = [{"id": "NousResearch/Meta-Llama-3.1-8B-Instruct", "name": "Meta Llama 3.1 8B Instruct"}, {"id": "mistralai/Mistral-7B-Instruct-v0.3", "name": "Mistral 7B Instruct v0.3"}] challenger_model = choice(challenger_models) model_info.append(challenger_model) shuffle(model_info) device = "cuda" try: tokenizer_a = AutoTokenizer.from_pretrained(model_info[0]['id']) model_a = AutoModelForCausalLM.from_pretrained( model_info[0]['id'], torch_dtype=torch.float16, device_map="auto", trust_remote_code=True, ) tokenizer_b = AutoTokenizer.from_pretrained(model_info[1]['id']) model_b = AutoModelForCausalLM.from_pretrained( model_info[1]['id'], torch_dtype=torch.float16, device_map="auto", trust_remote_code=True, ) except Exception as e: logging.error(f'{SPACER} Error: {e}, Traceback {traceback.format_exc()}') def apply_pharia_template(messages, add_generation_prompt=False): """Chat template not defined in Pharia model configs. Adds chat template for Pharia. Expects a list of messages. add_generation_prompt:bool extends tmplate for generation. """ pharia_template = """<|begin_of_text|>""" role_map = { "system": "<|start_header_id|>system<|end_header_id|>\n", "user": "<|start_header_id|>user<|end_header_id|>\n", "assistant": "<|start_header_id|>assistant<|end_header_id|>\n", } for message in messages: role = message["role"] content = message["content"] pharia_template += role_map.get(role, "") + content + "<|eot_id|>\n" if add_generation_prompt: pharia_template += "<|start_header_id|>assistant<|end_header_id|>\n" return pharia_template @spaces.GPU() def generate_both(system_prompt, input_text, chatbot_a, chatbot_b, max_new_tokens=2048, temperature=0.2, top_p=0.9, repetition_penalty=1.1): try: text_streamer_a = TextIteratorStreamer(tokenizer_a, skip_prompt=True) text_streamer_b = TextIteratorStreamer(tokenizer_b, skip_prompt=True) system_prompt_list = [{"role": "system", "content": system_prompt}] if system_prompt else [] input_text_list = [{"role": "user", "content": input_text}] chat_history_a = [] for user, assistant in chatbot_a: chat_history_a.append({"role": "user", "content": user}) chat_history_a.append({"role": "assistant", "content": assistant}) chat_history_b = [] for user, assistant in chatbot_b: chat_history_b.append({"role": "user", "content": user}) chat_history_b.append({"role": "assistant", "content": assistant}) new_messages_a = system_prompt_list + chat_history_a + input_text_list new_messages_b = system_prompt_list + chat_history_b + input_text_list if "Pharia" in model_info[0]['id']: formatted_conversation = apply_pharia_template(messages=new_messages_a, add_generation_prompt=True) tokenized = tokenizer_a(formatted_conversation, return_tensors="pt").to(device) logging.debug(tokenized) #attention_mask input_ids_a = tokenized.input_ids tokenizer_a.eos_token = "<|endoftext|>" # not set für Pharia tokenizer_a.pad_token = "<|padding|>" # not set für Pharia else: input_ids_a = tokenizer_a.apply_chat_template( new_messages_a, add_generation_prompt=True, dtype=torch.float16, return_tensors="pt" ).to(device) if "Pharia" in model_info[1]['id']: formatted_conversation = apply_pharia_template(messages=new_messages_a, add_generation_prompt=True) tokenized = tokenizer_b(formatted_conversation, return_tensors="pt").to(device) logging.debug(tokenized) input_ids_b = tokenized.input_ids logging.debug(f'tokenizer_b.pad_token is {tokenizer_b.pad_token}') tokenizer_b.eos_token = "<|endoftext|>" # not set für Pharia tokenizer_b.pad_token = "<|padding|>" # not set für Pharia else: input_ids_b = tokenizer_b.apply_chat_template( new_messages_b, add_generation_prompt=True, dtype=torch.float16, return_tensors="pt" ).to(device) generation_kwargs_a = dict( input_ids=input_ids_a, streamer=text_streamer_a, max_new_tokens=max_new_tokens, pad_token_id=tokenizer_a.eos_token_id, do_sample=True, temperature=temperature, top_p=top_p, repetition_penalty=repetition_penalty, ) generation_kwargs_b = dict( input_ids=input_ids_b, streamer=text_streamer_b, max_new_tokens=max_new_tokens, pad_token_id=tokenizer_b.eos_token_id, do_sample=True, temperature=temperature, top_p=top_p, repetition_penalty=repetition_penalty, ) thread_a = Thread(target=model_a.generate, kwargs=generation_kwargs_a) thread_b = Thread(target=model_b.generate, kwargs=generation_kwargs_b) thread_a.start() thread_b.start() chatbot_a.append([input_text, ""]) chatbot_b.append([input_text, ""]) finished_a = False finished_b = False except Exception as e: logging.error(f'{SPACER} Error: {e}, Traceback {traceback.format_exc()}') while not (finished_a and finished_b): if not finished_a: try: text_a = next(text_streamer_a) if tokenizer_a.eos_token in text_a: eot_location = text_a.find(tokenizer_a.eos_token) text_a = text_a[:eot_location] finished_a = True chatbot_a[-1][-1] += text_a yield chatbot_a, chatbot_b except StopIteration: finished_a = True except Exception as e: logging.error(f'{SPACER} Error: {e}, Traceback {traceback.format_exc()}') if not finished_b: try: text_b = next(text_streamer_b) if tokenizer_b.eos_token in text_b: eot_location = text_b.find(tokenizer_b.eos_token) text_b = text_b[:eot_location] finished_b = True chatbot_b[-1][-1] += text_b yield chatbot_a, chatbot_b except StopIteration: finished_b = True except Exception as e: logging.error(f'{SPACER} Error: {e}, Traceback {traceback.format_exc()}') return chatbot_a, chatbot_b def clear(): return [], [] def reveal_bot(selection, chatbot_a, chatbot_b): if selection == "Bot A kicks ass!": chatbot_a.append(["🏆", f"Thanks, man. I am {model_info[0]['name']}"]) chatbot_b.append(["💩", f"Pffff … I am {model_info[1]['name']}"]) elif selection == "Bot B crushes it!": chatbot_a.append(["🤡", f"Rigged … I am {model_info[0]['name']}"]) chatbot_b.append(["🥇", f"Well deserved! I am {model_info[1]['name']}"]) else: chatbot_a.append(["🤝", f"Lame … I am {model_info[0]['name']}"]) chatbot_b.append(["🤝", f"Dunno. I am {model_info[1]['name']}"]) return chatbot_a, chatbot_b with gr.Blocks() as demo: try: with gr.Column(): gr.HTML(htmL_info) with gr.Row(variant="compact"): with gr.Column(scale=0): language_dropdown = gr.Dropdown( choices=["English", "German", "French", "Spanish"], label="Select Language for System Prompt", value="English" ) with gr.Column(): system_prompt = gr.Textbox( lines=1, label="System Prompt", value=system_prompts["English"], show_copy_button=True ) with gr.Row(variant="panel"): with gr.Column(scale=1): submit_btn = gr.Button(value="Generate", variant="primary") clear_btn = gr.Button(value="Clear", variant="secondary") input_text = gr.Textbox(lines=1, label="Prompt", value="Write a Nike style ad headline about the shame of being second best.", scale=3, show_copy_button=True) with gr.Row(variant="panel"): with gr.Column(): chatbot_a = gr.Chatbot(label="Model A", show_copy_button=True, height=500) with gr.Column(): chatbot_b = gr.Chatbot(label="Model B", show_copy_button=True, height=500) with gr.Row(variant="panel"): better_bot = gr.Radio(["Bot A kicks ass!", "Bot B crushes it!", "It's a draw."], label="Rate the output!") with gr.Accordion(label="Generation Configurations", open=False): max_new_tokens = gr.Slider(minimum=128, maximum=4096, value=512, label="Max new tokens", step=128) temperature = gr.Slider(minimum=0.0, maximum=1.0, value=0.7, label="Temperature", step=0.01) top_p = gr.Slider(minimum=0.0, maximum=1.0, value=1.0, label="Top_p", step=0.01) repetition_penalty = gr.Slider(minimum=0.1, maximum=2.0, value=1.1, label="Repetition Penalty", step=0.1) language_dropdown.change( lambda lang: system_prompts[lang], inputs=[language_dropdown], outputs=[system_prompt] ) better_bot.select(reveal_bot, inputs=[better_bot, chatbot_a, chatbot_b], outputs=[chatbot_a, chatbot_b]) input_text.submit(generate_both, inputs=[system_prompt, input_text, chatbot_a, chatbot_b, max_new_tokens, temperature, top_p, repetition_penalty], outputs=[chatbot_a, chatbot_b]) submit_btn.click(generate_both, inputs=[system_prompt, input_text, chatbot_a, chatbot_b, max_new_tokens, temperature, top_p, repetition_penalty], outputs=[chatbot_a, chatbot_b]) clear_btn.click(clear, outputs=[chatbot_a, chatbot_b]) except Exception as e: logging.error(f'{SPACER} Error: {e}, Traceback {traceback.format_exc()}') if __name__ == "__main__": demo.queue().launch()