aihavin / app.py
SeyhaLite's picture
Update app.py
41245b0 verified
from flask import Flask, request, jsonify
import threading
import time
import logging
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, ContextTypes, filters
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import firebase_admin
from firebase_admin import credentials, firestore
import random
import string
import os
import json
import re
# Set up logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Initialize Flask app
app = Flask(__name__)
# Telegram Bot Config
TELEGRAM_TOKEN = "7742872311:AAEP4cVwTSyKJYLsBhrKfokNFVQboVddWpc" # Replace with your token
BOT_USERNAME = "khCharacterAibot"
ADMIN_ID = "6728254471"
# Firebase Setup
if not os.path.exists("serviceAccountKey.json"):
raise FileNotFoundError("Please upload 'serviceAccountKey.json' to the root directory")
cred = credentials.Certificate("serviceAccountKey.json")
if not firebase_admin._apps:
firebase_admin.initialize_app(cred)
db = firestore.client()
# Load a smaller model and tokenizer
model_name = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" # 1.1B parameters, ~2.2GB in FP16
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16, # Use FP16 for lower memory usage
device_map="auto" # Map to CPU
)
device = torch.device("cpu") # Explicitly set to CPU
# Membership Plans
PLAN_LIMITS = {
"Membership": {"ai": 1, "scenarios": 5},
"Premium": {"ai": 2, "scenarios": 10},
"Vip Premium Plus": {"ai": 3, "scenarios": 20}
}
# Firebase Data Functions
def get_user_info(user_id):
user_ref = db.collection("users").document(user_id).collection("info").document("data")
user_doc = user_ref.get()
return user_doc.to_dict() if user_doc.exists else None
def set_user_info(user_id, data):
db.collection("users").document(user_id).collection("info").document("data").set(data)
def get_user_ai(user_id, ai_name):
ai_ref = db.collection("users").document(user_id).collection("ai").document(ai_name)
ai_doc = ai_ref.get()
return ai_doc.to_dict() if ai_doc.exists else None
def set_user_ai(user_id, ai_name, data):
db.collection("users").document(user_id).collection("ai").document(ai_name).set(data)
def get_all_user_ai(user_id):
ai_docs = db.collection("users").document(user_id).collection("ai").stream()
return {doc.id: doc.to_dict() for doc in ai_docs}
def get_global_scenarios():
scenarios = db.collection("scenarios").stream()
return {doc.id: doc.to_dict() for doc in scenarios}
def set_global_scenario(scenario_name, data):
db.collection("scenarios").document(scenario_name).set(data)
# Generate Smart AI Response with History
async def generate_response(context, chat_id, user_input, character, scenario, history=""):
await context.bot.send_chat_action(chat_id=chat_id, action="typing")
await context.bot.send_message(chat_id=chat_id, text="AI αž€αŸ†αž–αž»αž„αž‚αž·αž...")
admin_data = db.collection("admin").document("training_data").get()
training_prompt = "\n".join([f"Q: {item['question']} A: {item['answer']}" for item in admin_data.to_dict().get("data", [])]) if admin_data.exists else ""
if re.search(r"αžŸαž½αžŸαŸ’αžαžΈ|Hello|hello|hi|Hi", user_input, re.IGNORECASE):
greetings = [
f"αžŸαž½αžŸαŸ’αžαžΈαž›αŸ„αž€αž’αŸ’αž“αž€! αžαŸ’αž‰αž»αŸ†αž‡αžΆ {character} αž“αŸ… {scenario}αŸ” αžαžΎαžαŸ’αž‰αž»αŸ†αž’αžΆαž…αž‡αž½αž™αž’αŸ’αžœαžΈαž”αžΆαž“?",
f"αžŸαž½αžŸαŸ’αžαžΈ! αžαŸ’αž‰αž»αŸ† {character} αž˜αž€αž–αžΈ {scenario} αžšαžΈαž€αžšαžΆαž™αž”αžΆαž“αž‡αž½αž”αž›αŸ„αž€αž’αŸ’αž“αž€!",
f"αžŸαž½αžŸαŸ’αžαžΈαž’αŸ’αž“αž€! αž“αŸ… {scenario} αž“αŸαŸ‡ αžαŸ’αž‰αž»αŸ† {character} αžŸαž”αŸ’αž”αžΆαž™αž…αž·αžαŸ’αžαžŽαžΆαžŸαŸ‹!"
]
return random.choice(greetings)
system_prompt = f"""
You are {character}, a highly intelligent AI fluent in Khmer, speaking with authority, charm, and deep understanding.
You are in {scenario}, responding accurately and politely in natural Khmer.
Strictly follow Khmer grammar rules (αž‚αŸ„αžšαž–αžαžΆαž˜αž€αŸ’αž”αž½αž“αžœαŸαž™αŸ’αž™αžΆαž€αžšαžŽαŸαžαŸ’αž˜αŸ‚αžš) and use polite language (ឧ. αž›αŸ„αž€αž’αŸ’αž“αž€, αžαŸ’αž‰αž»αŸ†).
Use this knowledge to answer correctly: {training_prompt}
Conversation History (last 200 chars): {history[-200:]}
"""
prompt = system_prompt + "\nαž’αŸ’αž“αž€αž”αŸ’αžšαžΎαž”αžΆαž“αžŸαž½αžš: " + user_input + "\n" + character + ": "
inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True, max_length=1024) # Reduced max_length for memory
inputs = {key: value.to(device) for key, value in inputs.items()}
outputs = model.generate(
input_ids=inputs["input_ids"],
attention_mask=inputs["attention_mask"],
max_length=1024, # Reduced for faster CPU processing
pad_token_id=tokenizer.eos_token_id,
temperature=0.7,
do_sample=True,
top_k=50,
top_p=0.95
)
response = tokenizer.decode(outputs[0], skip_special_tokens=True).split(character + ": ")[-1].strip()
if not any(ord(c) >= 0x1780 and ord(c) <= 0x17FF for c in response): # Check for Khmer characters
if "αž‡αž½αž™" in user_input:
response = "αžαŸ’αž‰αž»αŸ†αžšαžΈαž€αžšαžΆαž™αž“αžΉαž„αž‡αž½αž™αž›αŸ„αž€αž’αŸ’αž“αž€! αžŸαžΌαž˜αž”αŸ’αžšαžΆαž”αŸ‹αžαŸ’αž‰αž»αŸ†αžαžΆ αžαžΎαžαŸ’αž‰αž»αŸ†αž’αžΆαž…αž‡αž½αž™αž’αŸ’αžœαžΈαž”αžΆαž“?"
elif "αž’αžšαž‚αž»αžŽ" in user_input:
response = "αž’αžšαž‚αž»αžŽαž›αŸ„αž€αž’αŸ’αž“αž€αžŠαŸ‚αž›αž”αžΆαž“αž“αž·αž™αžΆαž™αž’αž‰αŸ’αž…αžΉαž„! αžαŸ’αž‰αž»αŸ†αžŸαž”αŸ’αž”αžΆαž™αž…αž·αžαŸ’αžαž”αžΆαž“αž‡αž½αž™αŸ”"
else:
response = f"αžαŸ’αž‰αž»αŸ†αž˜αž·αž“αž”αŸ’αžšαžΆαž€αžŠαžαžΆαž™αž›αŸ‹αž…αŸ’αž”αžΆαžŸαŸ‹αž‘αŸαŸ” αž›αŸ„αž€αž’αŸ’αž“αž€αž”αžΆαž“αžŸαž½αžš '{user_input}' αž˜αŸ‚αž“αž‘αŸ? αžŸαžΌαž˜αž”αž‰αŸ’αž‡αžΆαž€αŸ‹αž”αž“αŸ’αžαŸ‚αž˜!"
return response
# Command Handlers
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
user_data = get_user_info(user_id)
if not user_data:
user_data = {
"name": update.effective_user.first_name,
"username": update.effective_user.username or "",
"history": "",
"plan": "Membership",
"banned": False,
"stop_until": None,
"message_count": 0,
"last_active": time.time(),
"ai_ratings": {},
"selected_ai": None
}
set_user_info(user_id, user_data)
await context.bot.send_message(ADMIN_ID, f"αž’αŸ’αž“αž€αž”αŸ’αžšαžΎαžαŸ’αž˜αžΈ: {user_data['name']} (@{user_data['username']}, ID: {user_id})")
if user_data.get("banned"):
await update.message.reply_text("αžŸαžΌαž˜αž‘αŸ„αžŸ! αž›αŸ„αž€αž’αŸ’αž“αž€αžαŸ’αžšαžΌαžœαž”αžΆαž“αž αžΆαž˜αžƒαžΆαžαŸ‹αŸ”")
return
await show_menu(update, context)
async def show_menu(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
user_data = get_user_info(user_id)
keyboard = [
[InlineKeyboardButton("αž”αž„αŸ’αž€αžΎαž AI", callback_data="create_ai"),
InlineKeyboardButton("αž€αŸ‚ AI", callback_data="edit_ai")],
[InlineKeyboardButton("αž…αŸ‚αž€αžšαŸ†αž›αŸ‚αž€ AI", callback_data="share_ai"),
InlineKeyboardButton("AI αž–αŸαž‰αž“αž·αž™αž˜", callback_data="top_ai")],
[InlineKeyboardButton("αž”αŸ’αžαžΌαžšαž€αž˜αŸ’αž˜αžœαž·αž’αžΈ", callback_data="redeem"),
InlineKeyboardButton("αž”αž„αŸ’αž αžΆαž‰ AI αžšαž”αžŸαŸ‹αžαŸ’αž‰αž»αŸ†", callback_data="my_ai")],
[InlineKeyboardButton("αžŸαŸ’αžαžΆαž“αž—αžΆαž–αžαŸ’αž‰αž»αŸ†", callback_data="my_status"),
InlineKeyboardButton("αž‚αŸ’αžšαž”αŸ‹αž‚αŸ’αžšαž„ (Admin)", callback_data="admin_panel")],
[InlineKeyboardButton("αžŸαŸαžœαžΆαž€αž˜αŸ’αž˜αžŸαž˜αž‡αžΆαžœ", callback_data="subscriptions")]
]
ai_list = get_all_user_ai(user_id)
if ai_list:
ai_buttons = [InlineKeyboardButton(f"αž‡αž‡αŸ‚αž€αž‡αžΆαž˜αž½αž™ {name}", callback_data=f"chat_with_{name}") for name in ai_list.keys()]
keyboard.append(ai_buttons)
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("αžŸαž½αžŸαŸ’αžαžΈαž›αŸ„αž€αž’αŸ’αž“αž€! αžαŸ’αž‰αž»αŸ†αž‡αžΆ SokKhaαŸ” αž‡αŸ’αžšαžΎαžŸαžšαžΎαžŸαž˜αž»αžαž„αžΆαžš:", reply_markup=reply_markup)
async def my_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
user_data = get_user_info(user_id)
if not user_data:
await update.message.reply_text("αžŸαžΌαž˜αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž‡αžΆαž˜αž½αž™ /start!")
return
active_users = sum(1 for user in db.collection("users").stream() if (info := get_user_info(user.id)) and info.get("last_active", 0) > time.time() - 3600)
status = "αžŸαž€αž˜αŸ’αž˜" if user_data.get("last_active", 0) > time.time() - 3600 else "αž’αžŸαž€αž˜αŸ’αž˜"
message_count = user_data.get("message_count", 0)
plan = user_data.get("plan", "Membership")
keyboard = [[InlineKeyboardButton(f"αžœαžΆαž™αžαž˜αŸ’αž›αŸƒ AI: {name}", callback_data=f"rate_ai_{name}") for name in get_all_user_ai(user_id).keys()]] if get_all_user_ai(user_id) else []
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"αžŸαŸ’αžαžΆαž“αž—αžΆαž–αžšαž”αžŸαŸ‹αž›αŸ„αž€αž’αŸ’αž“αž€:\n"
f"- αž•αŸ‚αž“αž€αžΆαžš: {plan}\n"
f"- αžŸαž€αž˜αŸ’αž˜: {status}\n"
f"- αž’αŸ’αž“αž€αž”αŸ’αžšαžΎαžŸαž€αž˜αŸ’αž˜αžŸαžšαž»αž”: {active_users}\n"
f"- αžŸαžΆαžšαžŸαžšαž»αž”: {message_count}\n"
f"αž‡αŸ’αžšαžΎαžŸαžšαžΎαžŸ AI αžŠαžΎαž˜αŸ’αž”αžΈαžœαžΆαž™αžαž˜αŸ’αž›αŸƒ:",
reply_markup=reply_markup
)
# Subscription Services Display
async def show_subscriptions(update: Update, context: ContextTypes.DEFAULT_TYPE):
subscriptions = [
{"name": "Spud", "category": "Meal Kits", "price": "Starting at $60/week", "desc": "Fresh, local ingredient meal kits."},
{"name": "Fubo", "category": "Sports Streaming", "price": "$79.99/month", "desc": "Live sports like NFL, NBA."},
{"name": "CrateChef", "category": "Cooking Boxes", "price": "$49 bimonthly + shipping", "desc": "Chef-curated gourmet boxes."},
{"name": "Wantable", "category": "Clothing Styling", "price": "$20/styling fee", "desc": "Personalized clothing boxes."}
]
response = "αžŸαŸαžœαžΆαž€αž˜αŸ’αž˜αžŸαž˜αž‡αžΆαžœαž“αŸ…αžŸαž αžšαžŠαŸ’αž‹αž’αžΆαž˜αŸαžšαž·αž€ (αž˜αž·αž“αž˜αžΆαž“αž“αŸ…αž…αž€αŸ’αžšαž—αž–αž’αž„αŸ‹αž‚αŸ’αž›αŸαžŸ):\n\n"
for sub in subscriptions:
response += f"- **{sub['name']}** ({sub['category']}): {sub['price']}\n {sub['desc']}\n"
await update.callback_query.edit_message_text(response)
# Button Handler
async def button(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
user_id = str(query.from_user.id)
user_data = get_user_info(user_id)
if not user_data:
await query.edit_message_text("αžŸαžΌαž˜αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž‡αžΆαž˜αž½αž™ /start!")
return
if user_data.get("banned"):
await query.edit_message_text("αžŸαžΌαž˜αž‘αŸ„αžŸ! αž›αŸ„αž€αž’αŸ’αž“αž€αžαŸ’αžšαžΌαžœαž”αžΆαž“αž αžΆαž˜αžƒαžΆαžαŸ‹αŸ”")
return
if query.data == "create_ai":
plan = user_data.get("plan", "Membership")
limit = PLAN_LIMITS[plan]["ai"]
if len(get_all_user_ai(user_id)) >= limit:
await query.edit_message_text(f"αž›αŸ„αž€αž’αŸ’αž“αž€αž”αžΆαž“αžˆαžΆαž“αžŠαž›αŸ‹αž€αŸ†αžŽαžαŸ‹ {limit} AI αžŸαž˜αŸ’αžšαžΆαž”αŸ‹αž•αŸ‚αž“αž€αžΆαžš {plan}!")
else:
await query.edit_message_text("αž”αž‰αŸ’αž…αžΌαž›αžˆαŸ’αž˜αŸ„αŸ‡ AI (ឧ. SokKha):")
context.user_data["step"] = "create_name"
elif query.data == "edit_ai":
ai_list = "\n".join([name for name in get_all_user_ai(user_id).keys()]) or "αž‚αŸ’αž˜αžΆαž“ AI"
await query.edit_message_text(f"AI αžšαž”αžŸαŸ‹αž›αŸ„αž€αž’αŸ’αž“αž€:\n{ai_list}\nαž”αž‰αŸ’αž…αžΌαž›αžˆαŸ’αž˜αŸ„αŸ‡ AI αžŠαŸ‚αž›αž…αž„αŸ‹αž€αŸ‚:")
context.user_data["step"] = "edit_ai_name"
elif query.data == "share_ai":
ai_list = "\n".join([name for name in get_all_user_ai(user_id).keys()]) or "αž‚αŸ’αž˜αžΆαž“ AI"
await query.edit_message_text(f"AI αžšαž”αžŸαŸ‹αž›αŸ„αž€αž’αŸ’αž“αž€:\n{ai_list}\nαž”αž‰αŸ’αž…αžΌαž›αžˆαŸ’αž˜αŸ„αŸ‡ AI αžŠαŸ‚αž›αž…αž„αŸ‹αž…αŸ‚αž€αžšαŸ†αž›αŸ‚αž€:")
context.user_data["step"] = "share_ai"
elif query.data == "top_ai":
scenarios = get_global_scenarios()
top_scenarios = sorted(scenarios.items(), key=lambda x: x[1]["usage_count"], reverse=True)[:3]
response = "AI αž–αŸαž‰αž“αž·αž™αž˜:\n" + "\n".join([f"- {name}: {info['usage_count']} αžŠαž„" for name, info in top_scenarios])
await query.edit_message_text(response)
elif query.data.startswith("chat_with_"):
ai_name = query.data.split("chat_with_")[1]
ai_data = get_user_ai(user_id, ai_name) or get_global_scenarios().get(ai_name)
if not ai_data:
await query.edit_message_text("αžšαž€ AI αž˜αž·αž“αžƒαžΎαž‰!")
else:
user_data["selected_ai"] = ai_name
set_user_info(user_id, user_data)
response = await generate_response(context, query.message.chat_id, "αžŸαž½αžŸαŸ’αžαžΈ", ai_name, ai_data["prompt"], user_data["history"])
await query.edit_message_text(response)
user_data["history"] += f"αž’αŸ’αž“αž€αž”αŸ’αžšαžΎ: αžŸαž½αžŸαŸ’αžαžΈ\nAI: {response}\n"
set_user_info(user_id, user_data)
elif query.data == "redeem":
await query.edit_message_text("αž”αž‰αŸ’αž…αžΌαž›αž€αžΌαžŠαž”αŸαžŽαŸ’αž…:")
context.user_data["step"] = "redeem_coupon"
elif query.data == "my_ai":
ai_list = "\n".join([f"- {name}: {info['prompt']}" for name, info in get_all_user_ai(user_id).items()]) or "αž‚αŸ’αž˜αžΆαž“ AI"
await query.edit_message_text(f"AI αžšαž”αžŸαŸ‹αž›αŸ„αž€αž’αŸ’αž“αž€:\n{ai_list}")
elif query.data == "my_status":
await my_status(update, context)
elif query.data.startswith("rate_ai_"):
ai_name = query.data.split("rate_ai_")[1]
keyboard = [
[InlineKeyboardButton("⭐ 1", callback_data=f"rate_{ai_name}_1"),
InlineKeyboardButton("⭐ 2", callback_data=f"rate_{ai_name}_2")],
[InlineKeyboardButton("⭐ 3", callback_data=f"rate_{ai_name}_3"),
InlineKeyboardButton("⭐ 4", callback_data=f"rate_{ai_name}_4")],
[InlineKeyboardButton("⭐ 5", callback_data=f"rate_{ai_name}_5")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(f"αžœαžΆαž™αžαž˜αŸ’αž›αŸƒ AI '{ai_name}':", reply_markup=reply_markup)
elif query.data.startswith("rate_"):
parts = query.data.split("_")
ai_name, rating = parts[1], int(parts[2])
user_data["ai_ratings"][ai_name] = rating
set_user_info(user_id, user_data)
await query.edit_message_text(f"αž’αŸ’αž“αž€αž”αžΆαž“αžœαžΆαž™αžαž˜αŸ’αž›αŸƒ '{ai_name}' αž‡αžΆ {rating} ⭐!")
elif query.data == "admin_panel":
if user_id != ADMIN_ID:
await query.edit_message_text("αž›αŸ„αž€αž’αŸ’αž“αž€αž˜αž·αž“αž˜αžΆαž“αžŸαž·αž‘αŸ’αž’αž·!")
return
keyboard = [
[InlineKeyboardButton("αžšαž€αŸ’αžŸαžΆαž‘αž·αž“αŸ’αž“αž“αŸαž™", callback_data="admin_save"),
InlineKeyboardButton("αž•αŸ’αžŸαž–αŸ’αžœαž•αŸ’αžŸαžΆαž™αž’αžαŸ’αžαž”αž‘", callback_data="admin_broadcast_text")],
[InlineKeyboardButton("αž•αŸ’αžŸαž–αŸ’αžœαž•αŸ’αžŸαžΆαž™αžšαžΌαž”αž—αžΆαž–", callback_data="admin_broadcast_photo"),
InlineKeyboardButton("αž•αŸ’αžŸαž–αŸ’αžœαž•αŸ’αžŸαžΆαž™αžœαžΈαžŠαŸαž’αžΌ", callback_data="admin_broadcast_video")],
[InlineKeyboardButton("αž•αŸ’αžŸαž–αŸ’αžœαž•αŸ’αžŸαžΆαž™αž”αž‰αŸ’αž‡αžΌαž“αž”αž“αŸ’αž", callback_data="admin_broadcast_forward"),
InlineKeyboardButton("αž αžΆαž˜αžƒαžΆαžαŸ‹αž’αŸ’αž“αž€αž”αŸ’αžšαžΎ", callback_data="admin_ban")],
[InlineKeyboardButton("αžŠαŸ„αŸ‡αž αžΆαž˜αžƒαžΆαžαŸ‹", callback_data="admin_unban"),
InlineKeyboardButton("αž•αŸ’αž’αžΆαž€αž’αŸ’αž“αž€αž”αŸ’αžšαžΎ", callback_data="admin_stop")],
[InlineKeyboardButton("αž…αŸ†αž“αž½αž“αž’αŸ’αž“αž€αž”αŸ’αžšαžΎ", callback_data="admin_count"),
InlineKeyboardButton("αž”αž„αŸ’αž€αžΎαžαž€αžΌαžŠαž”αŸαžŽαŸ’αž…", callback_data="admin_coupon")],
[InlineKeyboardButton("αž”αžŽαŸ’αžαž»αŸ‡αž”αžŽαŸ’αžαžΆαž› AI", callback_data="admin_train")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text("αž‡αŸ’αžšαžΎαžŸαž˜αž»αžαž„αžΆαžš Admin:", reply_markup=reply_markup)
elif query.data == "subscriptions":
await show_subscriptions(update, context)
elif query.data.startswith("admin_"):
await handle_admin_action(query, context)
# Handle Messages with Selected AI
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = str(update.effective_user.id)
user_data = get_user_info(user_id)
text = update.message.text
if not user_data:
await update.message.reply_text("αžŸαžΌαž˜αž…αžΆαž”αŸ‹αž•αŸ’αžαžΎαž˜αž‡αžΆαž˜αž½αž™ /start!")
return
if user_data.get("banned"):
await update.message.reply_text("αžŸαžΌαž˜αž‘αŸ„αžŸ! αž›αŸ„αž€αž’αŸ’αž“αž€αžαŸ’αžšαžΌαžœαž”αžΆαž“αž αžΆαž˜αžƒαžΆαžαŸ‹!")
return
user_data["message_count"] = user_data.get("message_count", 0) + 1
user_data["last_active"] = time.time()
set_user_info(user_id, user_data)
if "step" in context.user_data:
if context.user_data["step"] == "create_name":
context.user_data["ai_name"] = text
await update.message.reply_text("αž”αž‰αŸ’αž…αžΌαž›αžŸαŸαžŽαžΆαžšαžΈαž™αŸ‰αžΌ (ឧ. αž“αŸ…αž€αŸ’αž“αž»αž„αž—αžΌαž˜αž·αž˜αž½αž™):")
context.user_data["step"] = "create_scenario"
elif context.user_data["step"] == "create_scenario":
ai_data = {"prompt": text, "usage_count": 1, "creator": user_id}
set_user_ai(user_id, context.user_data["ai_name"], ai_data)
set_global_scenario(context.user_data["ai_name"], ai_data)
await update.message.reply_text(f"αž”αžΆαž“αž”αž„αŸ’αž€αžΎαž AI '{context.user_data['ai_name']}'!")
del context.user_data["step"]
elif context.user_data["step"] == "edit_ai_name":
ai_data = get_user_ai(user_id, text)
if not ai_data:
await update.message.reply_text("αžšαž€ AI αž˜αž·αž“αžƒαžΎαž‰!")
else:
context.user_data["ai_name"] = text
await update.message.reply_text("αž”αž‰αŸ’αž…αžΌαž›αžŸαŸαžŽαžΆαžšαžΈαž™αŸ‰αžΌαžαŸ’αž˜αžΈ:")
context.user_data["step"] = "edit_ai_scenario"
elif context.user_data["step"] == "edit_ai_scenario":
ai_data = get_user_ai(user_id, context.user_data["ai_name"])
ai_data["prompt"] = text
set_user_ai(user_id, context.user_data["ai_name"], ai_data)
set_global_scenario(context.user_data["ai_name"], ai_data)
await update.message.reply_text(f"αž”αžΆαž“αž€αŸ‚ AI '{context.user_data['ai_name']}'!")
del context.user_data["step"]
elif context.user_data["step"] == "share_ai":
if get_user_ai(user_id, text):
share_link = f"https://t.me/{BOT_USERNAME}?start=share_{user_id}_{text}"
await update.message.reply_text(f"αž›αžΈαž„αž…αŸ‚αž€αžšαŸ†αž›αŸ‚αž€ AI '{text}': {share_link}")
else:
await update.message.reply_text("αžšαž€ AI αž˜αž·αž“αžƒαžΎαž‰!")
del context.user_data["step"]
elif context.user_data["step"] == "redeem_coupon":
await redeem_coupon(update, context, text)
del context.user_data["step"]
elif context.user_data["step"].startswith("admin_"):
await handle_admin_input(update, context, text)
return
selected_ai = user_data.get("selected_ai")
if selected_ai and (ai_data := get_user_ai(user_id, selected_ai) or get_global_scenarios().get(selected_ai)):
response = await generate_response(context, update.message.chat_id, text, selected_ai, ai_data["prompt"], user_data["history"])
ai_data["usage_count"] = ai_data.get("usage_count", 0) + 1
set_user_ai(user_id, selected_ai, ai_data) if get_user_ai(user_id, selected_ai) else set_global_scenario(selected_ai, ai_data)
else:
response = await generate_response(context, update.message.chat_id, text, "SokKha", "αž“αŸ…αž€αŸ’αž“αž»αž„αž—αžΌαž˜αž·αž˜αž½αž™", user_data["history"])
await update.message.reply_text(response)
user_data["history"] += f"αž’αŸ’αž“αž€αž”αŸ’αžšαžΎ: {text}\nAI: {response}\n"
if len(user_data["history"]) > 1000:
user_data["history"] = user_data["history"][-1000:]
set_user_info(user_id, user_data)
# Admin Action Handler
async def handle_admin_action(query, context: ContextTypes.DEFAULT_TYPE):
user_id = str(query.from_user.id)
if user_id != ADMIN_ID:
await query.edit_message_text("αž›αŸ„αž€αž’αŸ’αž“αž€αž˜αž·αž“αž˜αžΆαž“αžŸαž·αž‘αŸ’αž’αž·!")
return
action = query.data
if action == "admin_save":
users = db.collection("users").stream()
data = {user.id: {"info": get_user_info(user.id), "ai": get_all_user_ai(user.id)} for user in users}
with open("user_data_backup.json", "w") as f:
json.dump(data, f, ensure_ascii=False)
await query.edit_message_text("αž”αžΆαž“αžšαž€αŸ’αžŸαžΆαž‘αž·αž“αŸ’αž“αž“αŸαž™!")
elif action == "admin_broadcast_text":
await query.edit_message_text("αž•αŸ’αž‰αžΎαž’αžαŸ’αžαž”αž‘:")
context.user_data["step"] = "admin_broadcast_text"
elif action == "admin_broadcast_photo":
await query.edit_message_text("αž•αŸ’αž‰αžΎαžšαžΌαž”αž—αžΆαž–:")
context.user_data["step"] = "admin_broadcast_photo"
elif action == "admin_broadcast_video":
await query.edit_message_text("αž•αŸ’αž‰αžΎαžœαžΈαžŠαŸαž’αžΌ:")
context.user_data["step"] = "admin_broadcast_video"
elif action == "admin_broadcast_forward":
await query.edit_message_text("αž”αž‰αŸ’αž‡αžΌαž“αžŸαžΆαžšαžŠαŸ‚αž›αž…αž„αŸ‹αž•αŸ’αžŸαž–αŸ’αžœαž•αŸ’αžŸαžΆαž™:")
context.user_data["step"] = "admin_broadcast_forward"
elif action == "admin_ban":
await query.edit_message_text("αž”αž‰αŸ’αž…αžΌαž› ID:")
context.user_data["step"] = "admin_ban"
elif action == "admin_unban":
await query.edit_message_text("αž”αž‰αŸ’αž…αžΌαž› ID:")
context.user_data["step"] = "admin_unban"
elif action == "admin_stop":
await query.edit_message_text("αž”αž‰αŸ’αž…αžΌαž› ID αž“αž·αž„αž˜αŸ‰αŸ„αž„ (ឧ. 12345 24):")
context.user_data["step"] = "admin_stop"
elif action == "admin_count":
users = db.collection("users").stream()
total = sum(1 for _ in users)
await query.edit_message_text(f"αž…αŸ†αž“αž½αž“αž’αŸ’αž“αž€αž”αŸ’αžšαžΎ: {total}")
elif action == "admin_coupon":
await query.edit_message_text("αž”αž‰αŸ’αž…αžΌαž›αž•αŸ‚αž“αž€αžΆαžš αž“αž·αž„αžαŸ’αž„αŸƒ (ឧ. Premium 30):")
context.user_data["step"] = "admin_coupon"
elif action == "admin_train":
await query.edit_message_text("αž”αž‰αŸ’αž…αžΌαž›αžŸαŸ†αžŽαž½αžš (Q):")
context.user_data["step"] = "admin_train_q"
# Admin Input Handler
async def handle_admin_input(update: Update, context: ContextTypes.DEFAULT_TYPE, text):
user_id = str(update.effective_user.id)
if user_id != ADMIN_ID or "step" not in context.user_data:
return
step = context.user_data["step"]
users = db.collection("users").stream()
if step == "admin_broadcast_text":
for user in users:
try:
await context.bot.send_message(user.id, text)
except:
pass
await update.message.reply_text("αž”αžΆαž“αž•αŸ’αžŸαž–αŸ’αžœαž•αŸ’αžŸαžΆαž™αž’αžαŸ’αžαž”αž‘!")
elif step == "admin_broadcast_photo" and update.message.photo:
photo = update.message.photo[-1].file_id
caption = update.message.caption or ""
for user in users:
try:
await context.bot.send_photo(user.id, photo, caption=caption)
except:
pass
await update.message.reply_text("αž”αžΆαž“αž•αŸ’αžŸαž–αŸ’αžœαž•αŸ’αžŸαžΆαž™αžšαžΌαž”αž—αžΆαž–!")
elif step == "admin_broadcast_video" and update.message.video:
video = update.message.video.file_id
caption = update.message.caption or ""
for user in users:
try:
await context.bot.send_video(user.id, video, caption=caption)
except:
pass
await update.message.reply_text("αž”αžΆαž“αž•αŸ’αžŸαž–αŸ’αžœαž•αŸ’αžŸαžΆαž™αžœαžΈαžŠαŸαž’αžΌ!")
elif step == "admin_broadcast_forward":
message_id = update.message.message_id
for user in users:
try:
await context.bot.forward_message(user.id, user_id, message_id)
except:
pass
await update.message.reply_text("αž”αžΆαž“αž”αž‰αŸ’αž‡αžΌαž“αž”αž“αŸ’αž!")
elif step == "admin_ban":
target_id = text
user_data = get_user_info(target_id)
if user_data:
user_data["banned"] = True
set_user_info(target_id, user_data)
await update.message.reply_text(f"αž”αžΆαž“αž αžΆαž˜αžƒαžΆαžαŸ‹ ID {target_id}!")
elif step == "admin_unban":
target_id = text
user_data = get_user_info(target_id)
if user_data:
user_data["banned"] = False
set_user_info(target_id, user_data)
await update.message.reply_text(f"αž”αžΆαž“αžŠαŸ„αŸ‡αž αžΆαž˜αžƒαžΆαžαŸ‹ ID {target_id}!")
elif step == "admin_stop":
parts = text.split()
if len(parts) == 2:
target_id, hours = parts[0], int(parts[1])
user_data = get_user_info(target_id)
if user_data:
user_data["stop_until"] = time.time() + hours * 3600
set_user_info(target_id, user_data)
await update.message.reply_text(f"αž”αžΆαž“αž•αŸ’αž’αžΆαž€ ID {target_id} αžšαž™αŸˆαž–αŸαž› {hours} αž˜αŸ‰αŸ„αž„!")
elif step == "admin_coupon":
parts = text.split()
if len(parts) == 2 and parts[0] in PLAN_LIMITS:
plan, days = parts[0], int(parts[1])
code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))
db.collection("coupons").document(code).set({
"plan": plan,
"days": days,
"used": False,
"created": time.time()
})
await update.message.reply_text(f"αž€αžΌαžŠαž”αŸαžŽαŸ’αž…: {code} ({plan}, {days} αžαŸ’αž„αŸƒ)")
elif step == "admin_train_q":
context.user_data["train_question"] = text
await update.message.reply_text("αž”αž‰αŸ’αž…αžΌαž›αž…αž˜αŸ’αž›αžΎαž™ (A):")
context.user_data["step"] = "admin_train_a"
elif step == "admin_train_a":
admin_data = db.collection("admin").document("training_data").get()
training_data = admin_data.to_dict().get("data", []) if admin_data.exists else []
training_data.append({"question": context.user_data["train_question"], "answer": text})
db.collection("admin").document("training_data").set({"data": training_data})
await update.message.reply_text("αž”αžΆαž“αž”αžŽαŸ’αžαž»αŸ‡αž”αžŽαŸ’αžαžΆαž› AI!")
del context.user_data["train_question"]
del context.user_data["step"]
# Coupon Redemption
async def redeem_coupon(update: Update, context: ContextTypes.DEFAULT_TYPE, code):
coupon = db.collection("coupons").document(code).get()
if not coupon.exists or coupon.to_dict()["used"]:
await update.message.reply_text("αž€αžΌαžŠαž˜αž·αž“αžαŸ’αžšαžΉαž˜αžαŸ’αžšαžΌαžœ αž¬αž”αžΆαž“αž”αŸ’αžšαžΎαžšαž½αž…!")
return
coupon_data = coupon.to_dict()
user_id = str(update.effective_user.id)
user_data = get_user_info(user_id)
user_data["plan"] = coupon_data["plan"]
set_user_info(user_id, user_data)
db.collection("coupons").document(code).update({"used": True, "user_id": user_id})
await update.message.reply_text(f"αž”αžΆαž“αžŠαŸ†αž‘αžΎαž„αž•αŸ‚αž“αž€αžΆαžš {coupon_data['plan']} αžŸαž˜αŸ’αžšαžΆαž”αŸ‹ {coupon_data['days']} αžαŸ’αž„αŸƒ!")
# Application Setup
application = Application.builder().token(TELEGRAM_TOKEN).build()
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("menu", show_menu))
application.add_handler(CommandHandler("mystatus", my_status))
application.add_handler(CallbackQueryHandler(button))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
# Flask Routes
@app.route("/webhook", methods=["POST"])
def webhook():
update = Update.de_json(request.get_json(force=True), application.bot)
application.process_update(update)
return "OK", 200
@app.route("/", methods=["GET"])
def setup_webhook():
# Replace with your Hugging Face Space URL once known
space_url = os.getenv("SPACE_URL", "https://seyhalite-aihavin.hf.space")
webhook_url = f"{space_url}/webhook"
application.bot.set_webhook(webhook_url)
return f"Webhook set to {webhook_url}!", 200
# Run Flask in a thread
def run_flask():
app.run(host="0.0.0.0", port=7860)
if __name__ == "__main__":
flask_thread = threading.Thread(target=run_flask)
flask_thread.start()
# Start polling in the main thread
application.run_polling()