Minecraft / app.py
Staticaliza's picture
Update app.py
ef327d3 verified
# Imports
import gradio as gr
import spaces, requests, os, time, random, threading, subprocess, shutil, tarfile, urllib.request, json
from huggingface_hub import hf_hub_download, upload_file
# Variables
PASSWORD = os.environ.get("PASSWORD", "")
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "")
TCP_TOKEN = os.environ.get("TCP_TOKEN", "")
HF_TOKEN = os.environ.get("HF_TOKEN", "")
SERVER_ID = "purpur"
WORLD_ID = "staticraft_v1"
HF_REPO_ID = "Staticaliza/Minecraft-Saves"
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
OP_UUIDS = ["73526457-17fc-48e0-b1ab-b08403f6aa4c"]
PUBLIC_TCP = None
SERVER_ONLINE = False
SERVER_PROCESS = None
TCP_PROCESS = None
SERVER_OUTPUT_BUFFER = []
ONLINE_PLAYERS = set()
TIMEZONE = "US/Eastern"
RESTART_INTERVAL_HOURS = 6
css = '''
.gradio-container{max-width: 560px !important; margin: auto;}
h1{text-align:center}
footer {
visibility: hidden
}
'''
# Functions
def read_server_output():
global SERVER_PROCESS, SERVER_OUTPUT_BUFFER, ONLINE_PLAYERS
while True:
if SERVER_PROCESS is None or SERVER_PROCESS.stdout is None:
time.sleep(0.1)
continue
line = SERVER_PROCESS.stdout.readline()
if line:
SERVER_OUTPUT_BUFFER.append(line)
if " joined the game" in line:
try:
name = line.split("INFO]:")[1].split(" joined")[0].strip()
ONLINE_PLAYERS.add(name)
if len(ONLINE_PLAYERS) == 1:
save_world_to_cloud()
except:
pass
elif " left the game" in line:
try:
name = line.split("INFO]:")[1].split(" left")[0].strip()
if name in ONLINE_PLAYERS:
ONLINE_PLAYERS.remove(name)
if len(ONLINE_PLAYERS) == 0:
save_world_to_cloud()
except:
pass
else:
time.sleep(0.1)
def is_within_directory(directory, target):
abs_directory = os.path.abspath(directory)
abs_target = os.path.abspath(target)
return os.path.commonprefix([abs_directory, abs_target]) == abs_directory
def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
for member in tar.getmembers():
member_path = os.path.join(path, member.name)
if not is_within_directory(path, member_path):
raise Exception("[SAFE EXTRACT] | Error: Attempted Path Traversal in Tar File")
tar.extractall(path, members, numeric_owner=numeric_owner)
def save_world_to_cloud():
print(f"[SAVE] | Saving current world '{WORLD_ID}' to cloud...")
if SERVER_PROCESS is not None:
try:
SERVER_PROCESS.stdin.write("save-off\n")
SERVER_PROCESS.stdin.flush()
SERVER_PROCESS.stdin.write("save-all\n")
SERVER_PROCESS.stdin.flush()
time.sleep(15)
SERVER_PROCESS.stdin.write("save-on\n")
SERVER_PROCESS.stdin.flush()
except Exception as e:
print("[SAVE] | Error: failed while issuing save commands:", e)
tar_path = os.path.join(BASE_DIR, f"{WORLD_ID}.tar.gz")
try:
with tarfile.open(tar_path, "w:gz") as tar:
world_dir = os.path.join(BASE_DIR, "world")
if os.path.exists(world_dir):
tar.add(world_dir, arcname="world", recursive=True)
nether_dir = os.path.join(BASE_DIR, "world_nether")
if os.path.exists(nether_dir):
tar.add(nether_dir, arcname="world_nether", recursive=True)
end_dir = os.path.join(BASE_DIR, "world_the_end")
if os.path.exists(end_dir):
tar.add(end_dir, arcname="world_the_end", recursive=True)
try:
upload_file(path_or_fileobj=tar_path, path_in_repo=f"{WORLD_ID}.tar.gz", repo_id=HF_REPO_ID, token=HF_TOKEN, repo_type="model", commit_message=f"Autosave {WORLD_ID} world data.")
print(f"[SAVE] | World '{WORLD_ID}' saved to cloud.")
except Exception as e:
print(f"[SAVE] | Error: failed to upload world '{WORLD_ID}':", e)
except Exception as e:
print(f"[SAVE] | Error: failed to create tar file for world '{WORLD_ID}':", e)
def load_world_from_cloud():
print(f"[LOAD] | Loading current world '{WORLD_ID}' from cloud...")
try:
file_path = hf_hub_download(repo_id=HF_REPO_ID, filename=f"{WORLD_ID}.tar.gz", token=HF_TOKEN)
except Exception as e:
print(f"[LOAD] | Error: failed to download world backup file '{WORLD_ID}':", e)
return
world_path = os.path.join(BASE_DIR, "world")
if os.path.exists(world_path):
shutil.rmtree(world_path)
os.makedirs(world_path, exist_ok=True)
try:
with tarfile.open(file_path, "r:gz") as tar:
safe_extract(tar, path=BASE_DIR)
print(f"[LOAD] | World '{WORLD_ID}' loaded from cloud.")
except Exception as e:
print(f"[LOAD] | Error: failed to load world '{WORLD_ID}':", e)
def load_manual(password, new_world_id):
global SERVER_PROCESS, WORLD_ID
if password != ADMIN_PASSWORD:
return "Incorrect administrator password."
if not new_world_id or new_world_id.strip() == "" or new_world_id == WORLD_ID:
return f"No world provided; current world is: '{WORLD_ID}'"
print(f"[LOAD MANUAL] | Performing manual load for world '{new_world_id}' and unloading '{WORLD_ID}'...")
execute_command(ADMIN_PASSWORD, f"kick @a The server is switching worlds from '{WORLD_ID}' to '{new_world_id}'; please ask the server operator for the new server join address.")
time.sleep(3)
if SERVER_PROCESS is not None:
try:
SERVER_PROCESS.stdin.write("stop\n")
SERVER_PROCESS.stdin.flush()
except Exception as e:
print("[LOAD MANUAL] | Error sending stop command:", e)
try:
SERVER_PROCESS.wait(timeout=10)
except Exception as e:
print("[LOAD MANUAL] | Server did not shut down gracefully, forcing termination:", e)
SERVER_PROCESS.terminate()
SERVER_PROCESS = None
old_world = WORLD_ID
save_world_to_cloud()
world_path = os.path.join(BASE_DIR, "world")
if os.path.exists(world_path):
shutil.rmtree(world_path)
WORLD_ID = new_world_id
load_world_from_cloud()
if not os.path.exists(world_path) or not os.listdir(world_path):
print(f"[LOAD MANUAL] | Backup for world '{WORLD_ID}' not found; creating new world data...")
os.makedirs(world_path, exist_ok=True)
save_world_to_cloud()
threading.Thread(target=start_server, daemon=True).start()
return f"World transitioned from '{old_world}' to '{WORLD_ID}' successfully."
def save_manual(password):
if password != ADMIN_PASSWORD:
return "Incorrect administrator password."
print(f"[SAVE MANUAL] | Performing manual save for world '{WORLD_ID}'...")
save_world_to_cloud()
return f"World '{WORLD_ID}' has been saved."
def periodic_save(interval=60):
while True:
time.sleep(interval)
if len(ONLINE_PLAYERS) > 0:
print(f"[SAVE MANUAL] | Performing autosave for world '{WORLD_ID}'...")
save_world_to_cloud()
def install_java21_local():
jdk_url = "https://download.oracle.com/java/21/latest/jdk-21_linux-x64_bin.tar.gz"
download_path = os.path.join(BASE_DIR, "jdk-21.tar.gz")
print("[JAVA] | Downloading Java 21...")
try:
urllib.request.urlretrieve(jdk_url, download_path)
except Exception as e:
print("[JAVA] | Error: failed to download Java 21:", e)
return
extract_dir = os.path.join(BASE_DIR, "jdk-21")
if not os.path.exists(extract_dir):
os.makedirs(extract_dir)
print("[JAVA] | Extracting Java 21...")
try:
with tarfile.open(download_path, "r:gz") as tar:
tar.extractall(path=extract_dir)
except Exception as e:
print("[JAVA] | Error: failed to extract Java 21:", e)
return
java_folder = None
for item in os.listdir(extract_dir):
if item.startswith("jdk-21"):
java_folder = os.path.join(extract_dir, item)
break
if java_folder:
os.environ["JAVA_HOME"] = java_folder
os.environ["PATH"] = os.path.join(java_folder, "bin") + ":" + os.environ.get("PATH", "")
print("[JAVA] | Java 21 installed at", java_folder)
else:
print("[JAVA] | Error: failed to locate Java 21 folder after extraction")
def create_ops_file():
ops = []
for uuid in OP_UUIDS:
ops.append({ "uuid": uuid, "name": uuid, "level": 4, "bypassesPlayerLimit": False })
ops_path = os.path.join(BASE_DIR, "ops.json")
try:
with open(ops_path, "w") as f:
json.dump(ops, f, indent=4)
print("[OPS] | ops.json created.")
except Exception as e:
print("[OPS] | Error: failed to create ops.json:", e)
def install_tcp():
ngrok_path = os.path.join(BASE_DIR, "ngrok")
if not os.path.exists(ngrok_path):
print("[NGROK] | Downloading ngrok...")
url = "https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip"
zip_path = os.path.join(BASE_DIR, "ngrok.zip")
try:
urllib.request.urlretrieve(url, zip_path)
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(BASE_DIR)
print("[NGROK] | ngrok downloaded.")
except Exception as e:
print("[NGROK] | Error: failed to download ngrok:", e)
try:
os.chmod(ngrok_path, 0o755)
except Exception as e:
print("[NGROK] | Error: failed to set permissions on ngrok:", e)
if os.path.exists(os.path.join(BASE_DIR, "ngrok.zip")):
os.remove(os.path.join(BASE_DIR, "ngrok.zip"))
return ngrok_path
def start_tcp():
global SERVER_ONLINE
SERVER_ONLINE = False
subprocess.run(["pkill", "ngrok"])
ngrok_path = install_tcp()
subprocess.run(["pkill", "ngrok"])
time.sleep(10)
subprocess.Popen([ngrok_path, "tcp", "25565", "--authtoken", TCP_TOKEN], cwd=BASE_DIR)
SERVER_ONLINE = True
def start_server():
load_world_from_cloud()
global SERVER_PROCESS
if shutil.which("java") is None:
install_java21_local()
if shutil.which("java") is None:
print("[SERVER] | Error: Java not found even after installation attempt.")
return
try:
with open(os.path.join(BASE_DIR, "eula.txt"), "w") as f:
f.write("eula=true")
except Exception as e:
print("[SERVER] | Error: failed to write eula.txt:", e)
try:
java_cmd = shutil.which("java")
if SERVER_ID == "purpur":
jar_file = "purpur.jar"
SERVER_PROCESS = subprocess.Popen([java_cmd, "-jar", os.path.join(BASE_DIR, jar_file), "--nogui"], cwd=BASE_DIR, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1)
elif SERVER_ID == "forge":
installer_jar = "forge-installer.jar"
server_jar = "forge-1.21.4-54.1.0-shim.jar"
if not os.path.exists(os.path.join(BASE_DIR, server_jar)):
print("[SERVER] | Generating forge server jar via installer...")
result = subprocess.run([java_cmd, "-jar", os.path.join(BASE_DIR, installer_jar), "--installServer"], cwd=BASE_DIR, capture_output=True, text=True)
run_script = os.path.join(BASE_DIR, "run.sh")
if os.path.exists(run_script):
os.chmod(run_script, 0o755)
print("[SERVER] | Starting forge server using run.sh...")
SERVER_PROCESS = subprocess.Popen(["bash", run_script], cwd=BASE_DIR, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1)
else:
print("[SERVER] | run.sh not found, falling back to direct jar execution...")
SERVER_PROCESS = subprocess.Popen([java_cmd, "-jar", os.path.join(BASE_DIR, server_jar), "--nogui"], cwd=BASE_DIR, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1)
else:
print("[SERVER] | Error: unsupported server type.")
return
except Exception as e:
print("[SERVER] | Error: failed to start server:", e)
threading.Thread(target=read_server_output, daemon=True).start()
print("[SERVER] | Starting server...")
start_tcp()
print("[SERVER] | Server is online.")
execute_command(ADMIN_PASSWORD, "gamerule sendCommandFeedback false")
def get_address(password):
if password != PASSWORD:
return "Incorrect server password."
if not SERVER_ONLINE:
return "Server is being set up, please wait a few minutes until it goes online."
tcp = PUBLIC_TCP or "Cannot get address, please try again later."
address = tcp
if PUBLIC_TCP and PUBLIC_TCP.startswith("tcp://"):
address = PUBLIC_TCP.removeprefix("tcp://")
return address
def execute_command(password, cmd):
if password != ADMIN_PASSWORD:
return "Incorrect administrator password."
global SERVER_PROCESS, SERVER_OUTPUT_BUFFER
if SERVER_PROCESS is None or SERVER_PROCESS.poll() is not None:
return "Server not running."
SERVER_OUTPUT_BUFFER = []
try:
SERVER_PROCESS.stdin.write((cmd + "\n"))
SERVER_PROCESS.stdin.flush()
except Exception as e:
return f"Error: {e}"
for line in SERVER_OUTPUT_BUFFER:
if "Unknown or incomplete command" in line:
return "Unknown or incomplete command: " + cmd
return "Command executed."
def restart_server(password):
global SERVER_PROCESS
if password != ADMIN_PASSWORD:
return "Incorrect server password."
execute_command(ADMIN_PASSWORD, "kick @a The server is restarting; please ask the server operator for the new server join address..")
time.sleep(3)
if SERVER_PROCESS is not None:
try:
SERVER_PROCESS.stdin.write("stop\n")
SERVER_PROCESS.stdin.flush()
except Exception as e:
print("[RESTART] | Error sending stop command:", e)
try:
SERVER_PROCESS.wait(timeout=10)
except Exception as e:
print("[RESTART] | Server did not shut down gracefully, forcing termination:", e)
SERVER_PROCESS.terminate()
SERVER_PROCESS = None
threading.Thread(target=start_server, daemon=True).start()
print("[RESTART] | Successfully restarted server.")
return "Server has been restarted successfully."
def cloud():
print("[CLOUD] | Space maintained.")
@spaces.GPU(duration=15)
def gpu():
print("[GPU] | GPU maintained.")
# Initialize
create_ops_file()
threading.Thread(target=start_server, daemon=True).start()
threading.Thread(target=periodic_save, daemon=True).start()
with gr.Blocks(css=css) as main:
with gr.Column():
gr.Markdown("⛏️ • Staticraft is a Minecraft Java server provided by Staticaliza.")
gr.Markdown("❗ • Enter the correct server password and press 'Get Address ✅' to retrieve the server join address.")
with gr.Column():
maintain_button = gr.Button("Maintain Space ☁️")
with gr.Column():
address_input = gr.Textbox(label="Server Password", lines=1)
address_output = gr.Textbox(label="Server Address", lines=1)
get_button = gr.Button("Get Address ✅")
with gr.Column():
administrator_input = gr.Textbox(label="Administrator Password", lines=1)
command_input = gr.Textbox(label="Administrator Input", lines=1)
command_output = gr.Textbox(label="Administrator Output", lines=1)
execute_button = gr.Button("Execute Command ▶️")
load_button = gr.Button("Load World ⬆️")
save_button = gr.Button("Save World ⬇️")
restart_button = gr.Button("Restart Server 🔄️")
maintain_button.click(fn=cloud, inputs=[], outputs=[], queue=False)
get_button.click(fn=get_address, inputs=address_input, outputs=address_output, queue=False)
execute_button.click(fn=execute_command, inputs=[administrator_input, command_input], outputs=command_output, queue=False)
load_button.click(fn=load_manual, inputs=[administrator_input, command_input], outputs=command_output, queue=False)
save_button.click(fn=save_manual, inputs=administrator_input, outputs=command_output, queue=False)
restart_button.click(fn=restart_server, inputs=administrator_input, outputs=command_output, queue=False)
main.launch(show_api=True)