Spaces:
Running
Running
import json | |
import os | |
import shutil | |
import time | |
import uuid | |
import zipfile | |
import boto3 | |
import gradio as gr | |
import requests | |
import uvicorn | |
from botocore.config import Config | |
from fastapi import FastAPI | |
from fastapi.staticfiles import StaticFiles | |
# Define paths | |
S3_BUCKET = os.environ.get("S3_BUCKET") | |
UPLOAD_DIR = "uploads" | |
OUTPUT_DIR = "static/output" | |
# Ensure base directories exist | |
os.makedirs(UPLOAD_DIR, exist_ok=True) | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
# Initialize AWS clients | |
s3_client = boto3.client("s3") | |
# Create Lambda client with increased timeout | |
lambda_client = boto3.client( | |
"lambda", | |
region_name="us-west-2", | |
config=Config(read_timeout=600, connect_timeout=600), | |
) | |
def handle_file_upload(fileobj): | |
if fileobj is not None: | |
file_path = os.path.join(UPLOAD_DIR, os.path.basename(fileobj.name)) | |
shutil.copyfile(fileobj.name, file_path) | |
return file_path | |
return None | |
def upload_to_s3(file_path, s3_key): | |
s3_client.upload_file(file_path, S3_BUCKET, s3_key) | |
return f"s3://{S3_BUCKET}/{s3_key}" | |
def invoke_lambda_function(story_prompt, lore_file_s3_path): | |
payload = { | |
"story_prompt": story_prompt, | |
"lore_file": lore_file_s3_path, | |
} | |
try: | |
print("Building your game... This may take a few minutes.") | |
response = lambda_client.invoke( | |
FunctionName="renpy_builder", | |
InvocationType="RequestResponse", | |
Payload=json.dumps(payload), | |
) | |
response_payload = json.loads(response["Payload"].read()) | |
print("Lambda response:", response_payload) | |
if response_payload.get("success", False): | |
return response_payload["download_url"] | |
else: | |
raise Exception(response_payload.get("message", "Unknown error")) | |
except Exception as e: | |
print(f"Error invoking Lambda: {str(e)}") | |
return None | |
def load_existing_game(session_id): | |
"""Load an existing game by session ID""" | |
if not session_id: | |
return gr.HTML("Please enter a session ID") | |
# Check if the game exists | |
game_path = os.path.join(OUTPUT_DIR, session_id) | |
if not os.path.exists(os.path.join(game_path, "index.html")): | |
return gr.HTML(f"No game found with session ID: {session_id}") | |
# Generate a unique query parameter to force iframe reload | |
timestamp = int(time.time()) | |
# Create HTML with iframe | |
gradio_html = f""" | |
<div style="display: flex; flex-direction: column; align-items: center;"> | |
<iframe width="1280" height="720" src="static/output/{session_id}/index.html?v={timestamp}"></iframe> | |
<p style="text-align: center; margin-top: 10px;"> | |
Instructions: Use your mouse to interact with the game. Click to advance dialogue. | |
</p> | |
</div> | |
""" | |
return gr.HTML(gradio_html) | |
def build_and_display_game(story_prompt, lore_file): | |
try: | |
# Check if prompt is empty or just whitespace | |
if not story_prompt or not story_prompt.strip(): | |
return gr.HTML( | |
""" | |
<div style='color: red; padding: 10px; border: 1px solid red; border-radius: 5px;'> | |
Error: Please enter a story prompt. This field cannot be empty. | |
</div> | |
""" | |
) | |
# Validate lore file if provided | |
if lore_file: | |
# Get file extension | |
file_extension = os.path.splitext(lore_file.name)[1].lower() | |
# Check if it's a txt file | |
if file_extension != ".txt": | |
return gr.HTML( | |
""" | |
<div style='color: red; padding: 10px; border: 1px solid red; border-radius: 5px;'> | |
Error: Lore file must be a .txt file. Please upload a valid text file. | |
</div> | |
""" | |
) | |
# Check file size (e.g., limit to 1MB) | |
if os.path.getsize(lore_file.name) > 1_000_000: # 1MB limit | |
return gr.HTML( | |
""" | |
<div style='color: red; padding: 10px; border: 1px solid red; border-radius: 5px;'> | |
Error: Lore file is too large. Please keep file size under 1MB. | |
</div> | |
""" | |
) | |
# Generate a unique ID for this game session | |
session_id = str(uuid.uuid4()) | |
# Handle file upload for lore file | |
lore_file_s3_path = None | |
if lore_file: | |
try: | |
local_path = handle_file_upload(lore_file) | |
if not local_path: | |
return gr.HTML("<p>Error: Failed to process lore file</p>") | |
s3_key = f"lore_files/{session_id}/{os.path.basename(local_path)}" | |
lore_file_s3_path = upload_to_s3(local_path, s3_key) | |
if not lore_file_s3_path: | |
return gr.HTML("<p>Error: Failed to upload lore file to S3</p>") | |
except Exception as e: | |
return gr.HTML(f"<p>Error processing lore file: {str(e)}</p>") | |
# Call lambda function and get download URL | |
download_url = invoke_lambda_function(story_prompt, lore_file_s3_path) | |
if not download_url: | |
return gr.HTML( | |
"<p>Error: Failed to get download URL from Lambda function</p>" | |
) | |
# Download zip from the download_url | |
local_zip_path = f"/tmp/{session_id}_game.zip" | |
try: | |
response = requests.get(download_url) | |
response.raise_for_status() # Raise an exception for bad status codes | |
with open(local_zip_path, "wb") as f: | |
f.write(response.content) | |
except Exception as e: | |
return gr.HTML(f"<p>Error downloading game files: {str(e)}</p>") | |
# Validate zip file before extracting | |
if not zipfile.is_zipfile(local_zip_path): | |
return gr.HTML("<p>Error: Invalid game file received</p>") | |
# Unzip to /static/output/{session_id}/ | |
output_path = os.path.join(OUTPUT_DIR, session_id) | |
try: | |
with zipfile.ZipFile(local_zip_path, "r") as zip_ref: | |
# Check for index.html in zip file | |
if "index.html" not in zip_ref.namelist(): | |
return gr.HTML("<p>Error: Invalid game file structure</p>") | |
zip_ref.extractall(output_path) | |
except Exception as e: | |
return gr.HTML(f"<p>Error extracting game files: {str(e)}</p>") | |
# Generate a unique query parameter to force iframe reload | |
timestamp = int(time.time()) | |
# Create HTML with iframe | |
gradio_html = f""" | |
<div style="display: flex; flex-direction: column; align-items: center;"> | |
<p style="margin-bottom: 10px;"><strong>Session ID:</strong> {session_id}</p> | |
<iframe | |
width="1280" | |
height="720" | |
src="static/output/{session_id}/index.html?v={timestamp}" | |
style="border: none;" | |
></iframe> | |
<p style="text-align: center; margin-top: 10px;"> | |
Instructions: Use your mouse to interact with the game. Click to advance dialogue. | |
</p> | |
</div> | |
""" | |
return gr.HTML(gradio_html) | |
except Exception as e: | |
# Log the error for debugging | |
print(f"Unexpected error: {str(e)}") | |
return gr.HTML( | |
""" | |
<div style='color: red; padding: 10px; border: 1px solid red; border-radius: 5px;'> | |
An unexpected error occurred. Please try again later. | |
</div> | |
""" | |
) | |
except Exception as e: | |
print(f"Error in build_and_display_game: {str(e)}") | |
return gr.HTML(f"<p>Error: {str(e)}</p>") | |
# Create Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# VisualNovelLM") | |
gr.Markdown( | |
"Enter a prompt to generate a story, and optionally upload a document with additional lore." | |
) | |
with gr.Row(): | |
story_prompt = gr.Textbox( | |
label="Enter story prompt", | |
placeholder="Describe the story you want to generate. Include details about the setting, main characters, and key plot points.", | |
lines=5, | |
) | |
lore_file = gr.File( | |
label="Upload lore document (optional)", file_types=["text"] | |
) | |
# Add example prompts | |
gr.Examples( | |
examples=[ | |
[ | |
"Create a romantic comedy visual novel set in a bustling city. The main character is a young professional who accidentally swaps phones with their dream date. Include funny misunderstandings and heartwarming moments as they try to return the phone and potentially find love.", | |
None, | |
], | |
[ | |
"Generate a mystery visual novel set in a remote mountain village. The protagonist is a detective investigating a series of strange disappearances. Include red herrings, multiple suspects, and a surprising twist at the end.", | |
None, | |
], | |
[ | |
"Develop a sci-fi visual novel aboard a space station. The main character is a new crew member who discovers an alien artifact that grants them the ability to see glimpses of the future. Explore the ethical implications and potential dangers of this power.", | |
None, | |
], | |
[ | |
"Create a fantasy visual novel in a magic school setting. The protagonist is a student with unique abilities that make them an outcast. Include scenes of learning magic, making friends, and ultimately saving the school from a ancient threat.", | |
None, | |
], | |
[ | |
"Generate a historical visual novel set during the Renaissance in Florence. The main character is an apprentice artist trying to make a name for themselves. Include interactions with famous historical figures and a plot involving art forgery and political intrigue.", | |
None, | |
], | |
], | |
inputs=[story_prompt, lore_file], | |
label="Example Prompts", | |
cache_examples=True, # Enable caching for examples | |
outputs=gr.HTML(label="Game Output"), | |
fn=build_and_display_game, | |
run_on_click=False, | |
) | |
generate_button = gr.Button("Generate Game") | |
output = gr.HTML(label="Game Output") | |
generate_button.click( | |
fn=build_and_display_game, inputs=[story_prompt, lore_file], outputs=output | |
) | |
# with gr.Tab("Load Existing Game"): | |
# gr.Markdown("Enter a session ID to load an existing game") | |
# session_id_input = gr.Textbox( | |
# label="Session ID", placeholder="Enter the session ID of an existing game" | |
# ) | |
# load_button = gr.Button("Load Game") | |
# load_output = gr.HTML(label="Game Output") | |
# load_button.click( | |
# fn=load_existing_game, inputs=session_id_input, outputs=load_output | |
# ) | |
# Create FastAPI app | |
app = FastAPI() | |
app.mount("/static", StaticFiles(directory="static", html=True), name="static") | |
app = gr.mount_gradio_app(app, demo, "/") | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |