|
from llama_cpp import Llama |
|
import gradio as gr |
|
import os |
|
import requests |
|
import time |
|
|
|
|
|
MODEL_PATH = "qwen2.5-0.5b-instruct-q4_k_m.gguf" |
|
MODEL_URL = "https://huggingface.co/Qwen/Qwen2.5-0.5B-Instruct-GGUF/resolve/main/qwen2.5-0.5b-instruct-q4_k_m.gguf" |
|
|
|
def download_model(): |
|
"""Scarica il modello se non esiste già""" |
|
if not os.path.exists(MODEL_PATH): |
|
print("📥 Downloading Qwen2.5-0.5B-Instruct model...") |
|
try: |
|
response = requests.get(MODEL_URL, stream=True, timeout=300) |
|
response.raise_for_status() |
|
|
|
total_size = int(response.headers.get('content-length', 0)) |
|
downloaded = 0 |
|
|
|
with open(MODEL_PATH, 'wb') as f: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
if chunk: |
|
f.write(chunk) |
|
downloaded += len(chunk) |
|
if total_size > 0: |
|
progress = (downloaded / total_size) * 100 |
|
print(f"📥 Download progress: {progress:.1f}%") |
|
|
|
|
|
if os.path.getsize(MODEL_PATH) < 100000: |
|
print("❌ Downloaded file seems corrupted") |
|
os.remove(MODEL_PATH) |
|
return False |
|
|
|
print("✅ Model downloaded successfully!") |
|
return True |
|
|
|
except Exception as e: |
|
print(f"❌ Error downloading model: {e}") |
|
if os.path.exists(MODEL_PATH): |
|
os.remove(MODEL_PATH) |
|
return False |
|
else: |
|
print("✅ Model already exists!") |
|
|
|
if os.path.getsize(MODEL_PATH) < 100000: |
|
print("❌ Existing file seems corrupted, re-downloading...") |
|
os.remove(MODEL_PATH) |
|
return download_model() |
|
return True |
|
|
|
|
|
model_loaded = download_model() |
|
llm = None |
|
|
|
if model_loaded: |
|
|
|
try: |
|
llm = Llama( |
|
model_path=MODEL_PATH, |
|
n_ctx=2048, |
|
n_threads=4, |
|
n_batch=256, |
|
use_mlock=False, |
|
verbose=False, |
|
n_gpu_layers=0, |
|
use_mmap=True, |
|
low_vram=True, |
|
rope_scaling_type=1, |
|
rope_freq_base=10000.0 |
|
) |
|
print("✅ Qwen2.5-0.5B Model loaded successfully!") |
|
except Exception as e: |
|
print(f"❌ Error loading model: {e}") |
|
llm = None |
|
else: |
|
print("❌ Model not available, using fallback responses") |
|
|
|
|
|
system_prompt = """<|im_start|>system |
|
You are an expert D&D Dungeon Master. Create immersive, engaging adventures with vivid descriptions. Always end your responses with a question or choice for the player. Keep responses concise but atmospheric. |
|
<|im_end|>""" |
|
|
|
def generate_random_opening(): |
|
"""Genera un inizio casuale per l'avventura usando l'AI""" |
|
if llm is None: |
|
|
|
import random |
|
openings = [ |
|
"You enter a torch-lit dungeon. Water drips from ancient stones. A passage splits left and right. Which way?", |
|
"You're in a misty forest clearing. An old well sits in the center, rope disappearing into darkness. Investigate?", |
|
"The tavern door creaks open. Hooded figures look up from their ale. The barkeep waves you over. Approach?" |
|
] |
|
return f"🌟 **New Adventure!** 🌟\n\n{random.choice(openings)}" |
|
|
|
try: |
|
|
|
opening_prompt = f"""{system_prompt} |
|
<|im_start|>user |
|
Generate a creative D&D adventure opening in 2-3 sentences. Set an intriguing scene and end with a question for the player. |
|
<|im_end|> |
|
<|im_start|>assistant""" |
|
|
|
output = llm( |
|
opening_prompt, |
|
max_tokens=80, |
|
temperature=0.8, |
|
top_p=0.9, |
|
repeat_penalty=1.1, |
|
stop=["<|im_end|>", "<|im_start|>", "User:", "Player:"] |
|
) |
|
|
|
opening = output["choices"][0]["text"].strip() |
|
|
|
|
|
if not opening.endswith('?'): |
|
opening += " What do you do?" |
|
|
|
return f"🌟 **New Adventure!** 🌟\n\n{opening}" |
|
|
|
except Exception as e: |
|
print(f"Error generating opening: {e}") |
|
return f"🌟 **New Adventure!** 🌟\n\nYou find yourself in a mysterious place. Strange things are happening. What do you do?" |
|
|
|
chat_history = [] |
|
|
|
def generate_dm_response_with_timeout(message, timeout=30): |
|
"""Genera risposta con timeout ridotto per velocità""" |
|
if llm is None: |
|
|
|
import random |
|
fallbacks = [ |
|
"The path ahead is unclear. What's your next move?", |
|
"You hear footsteps approaching. How do you react?", |
|
"A mysterious door appears before you. Do you open it?", |
|
"The ground trembles slightly. What do you do?", |
|
"You find a strange artifact. Examine it closely?" |
|
] |
|
return random.choice(fallbacks) |
|
|
|
try: |
|
|
|
prompt = f"{system_prompt}\n" |
|
|
|
|
|
context_turns = min(len(chat_history), 3) |
|
for turn in chat_history[-context_turns:]: |
|
prompt += f"<|im_start|>user\n{turn['user']}\n<|im_end|>\n" |
|
prompt += f"<|im_start|>assistant\n{turn['ai']}\n<|im_end|>\n" |
|
|
|
prompt += f"<|im_start|>user\n{message}\n<|im_end|>\n<|im_start|>assistant\n" |
|
|
|
|
|
start_time = time.time() |
|
output = llm( |
|
prompt, |
|
max_tokens=100, |
|
stop=["<|im_end|>", "<|im_start|>", "User:", "Player:"], |
|
temperature=0.7, |
|
top_p=0.8, |
|
repeat_penalty=1.2, |
|
top_k=40, |
|
min_p=0.1 |
|
) |
|
|
|
|
|
elapsed_time = time.time() - start_time |
|
if elapsed_time > timeout: |
|
print(f"Response took {elapsed_time:.1f}s (timeout: {timeout}s)") |
|
return "Time passes quickly. What do you do next?" |
|
|
|
text = output["choices"][0]["text"].strip() |
|
|
|
|
|
if not text.endswith(('?', '!', '.')): |
|
text += "?" |
|
|
|
print(f"✅ Response generated in {elapsed_time:.1f}s") |
|
return text |
|
|
|
except Exception as e: |
|
print(f"Error generating response: {e}") |
|
return "Something unexpected happens. What do you do next?" |
|
|
|
def chat(message, history): |
|
global chat_history |
|
|
|
if not message.strip(): |
|
return "You stand there, unsure. What would you like to do?" |
|
|
|
|
|
dm_response = generate_dm_response_with_timeout(message) |
|
|
|
|
|
chat_history.append({"user": message, "ai": dm_response}) |
|
if len(chat_history) > 5: |
|
chat_history = chat_history[-5:] |
|
|
|
return dm_response |
|
|
|
def reset(): |
|
global chat_history |
|
chat_history = [] |
|
return generate_random_opening() |
|
|
|
|
|
with gr.Blocks(title="Infinite Dungeon - Lightning Fast", theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# ⚡ Infinite Dungeon - Lightning Fast") |
|
gr.Markdown("*Powered by Qwen2.5-0.5B - Optimized for 5-15 second responses*") |
|
gr.Markdown("🚀 **Super fast AI D&D with perfect memory retention**") |
|
|
|
|
|
chatbot = gr.Chatbot( |
|
value=[(None, "⚡ **Lightning Fast Adventure Ready!** ⚡\n\nPress 'New Adventure' to begin your quest!")], |
|
height=400, |
|
show_label=False |
|
) |
|
|
|
msg = gr.Textbox( |
|
label="Your action", |
|
placeholder="What do you do? (e.g., 'I search the room', 'I attack the orc', 'I cast a spell')", |
|
max_lines=2 |
|
) |
|
|
|
with gr.Row(): |
|
submit = gr.Button("⚔️ Act", variant="primary", size="lg") |
|
reset_btn = gr.Button("🔄 New Adventure", variant="secondary") |
|
|
|
gr.Markdown("⚡ **Ultra-fast responses**: 5-15 seconds | 🧠 **Perfect memory**: Never forgets your adventure!") |
|
|
|
|
|
def respond(message, chat_history_ui): |
|
if not message.strip(): |
|
return "", chat_history_ui |
|
|
|
|
|
chat_history_ui.append((message, "🎲 *The DM is thinking...*")) |
|
|
|
|
|
bot_message = chat(message, chat_history_ui) |
|
chat_history_ui[-1] = (message, bot_message) |
|
|
|
return "", chat_history_ui |
|
|
|
|
|
def reset_chat(): |
|
new_opening = reset() |
|
return [(None, new_opening)] |
|
|
|
|
|
msg.submit(respond, [msg, chatbot], [msg, chatbot]) |
|
submit.click(respond, [msg, chatbot], [msg, chatbot]) |
|
reset_btn.click(reset_chat, outputs=[chatbot]) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
show_error=True |
|
) |