discordbot / discord_bot.py
joytou's picture
Improve code, rename command to tree_command, add bot.command
79a7353
raw
history blame
8.89 kB
# This code is based on the following example:
# https://discordpy.readthedocs.io/en/stable/quickstart.html#a-minimal-bot
import sys
import os
import discord
from discord import app_commands
from discord.ext import commands
from threading import Thread
import json
from horde import HordeAPI
import inspect
from async_eval import eval
# 读取json
with open("discord.json", "r") as f:
json_data = json.load(f)
# 创建一个字典,将规则类型映射到相应的条件检查函数
check_functions = {
"equals": lambda content, message: message.content == content,
"contains": lambda content, message: content in message.content,
"startswith": lambda content, message: message.content.startswith(content),
"endswith": lambda content, message: message.content.endswith(content)
}
# 定义bot和tree
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(
command_prefix=json_data["command_prefix"] if "command_prefix" in json_data else '>',
intents=intents
)
tree = bot.tree
COMMAND_NAME_PREFIX=json_data["command_name_prefix"] if "command_name_prefix" in json_data else "discord_bot_call_"
def load_module_by_function_name(name: str = ""):
"""
根据函数名称,导入对应的模块
Args:
name: 函数名称
"""
module_name=name.split(".")[0]
if name.find(".") > 0 and module_name not in sys.modules:
exec(f"import {module_name}")
def generate_discord_command_callback_param_str(pre: str = '', parameters: list = []) -> str:
"""
生成discord command的callback的调用参数
Args:
pre: 前置固定参数
parameters: 参数列表
Returns:
string: 生成的调用参数的代码
"""
return f'''
{pre}{''.join([f", {param['name']}: {param['type']}"
for param in parameters])}
'''
def generate_user_function_param_str(parameters: list = []) -> str:
"""
生成调用用户自定义的函数的参数
Args:
parameters: 参数列表
Returns:
str: 生成的调用参数的代码
"""
return f'''{', '.join([f"{param['name']}={param['name']}"
for param in parameters])}'''
def generate_tree_add_command(command: dict = {}) -> str:
"""
生成tree add_command
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''
tree.add_command(app_commands.Command(
name="{command['name']}",
description="{command['description']}",
callback={COMMAND_NAME_PREFIX}{command['name']}
))
'''
def generate_discord_command_callback_function_str(command: dict = {}) -> str:
"""
生成discord command的callback的调用函数
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''
async def {COMMAND_NAME_PREFIX}{command['name']}({generate_discord_command_callback_param_str('interaction: discord.Interaction', command['parameters'] if 'parameters' in command else [])}):
await interaction.response.defer()
result = await {command['function']}({generate_user_function_param_str(command['parameters'] if 'parameters' in command else [])})
if result is not None:
await interaction.followup.send(result)
'''
def generate_bot_command_send(command: dict = {}) -> str:
"""
生成bot.command的send语句
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''await ctx.send(\"{command['response']}\")'''
def generate_bot_command_callback(command: dict = {}) -> str:
"""
生成bot.command的调用用户函数和send语句
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''result = await {command['function']}({generate_user_function_param_str(command['parameters'] if 'parameters' in command else [])})
await ctx.send(result)'''
def generate_bot_command_def(command: dict = {}) -> str:
"""
生成bot.command的定义函数
Args:
command: command
Returns:
str: 生成的调用代码
"""
return f'''
@bot.command()
async def {command['name']}({generate_discord_command_callback_param_str('ctx', command['parameters'] if 'parameters' in command else [])}):
{generate_bot_command_send(command) if "response" in command else generate_bot_command_callback(command)}
'''
if "app_command" in json_data:
for command in json_data["app_command"]:
load_module_by_function_name(command["function"])
exec(generate_discord_command_callback_function_str(command))
exec(generate_tree_add_command(command))
if "command" in json_data:
for command in json_data["command"]:
if "function" in command:
load_module_by_function_name(command["function"])
exec(generate_bot_command_def(command))
async def greet(name: str):
return f"Hello, {name}!"
async def get_kudos():
async with HordeAPI.getUserDetails() as details:
if "kudos" not in details:
return f'Error: {details["code"]} {details["reason"]}'
return f'The amount of Kudos this user has is {details["kudos"]}'
async def generate_status(id: str):
async with HordeAPI.generateCheck(id) as details:
if "kudos" not in details:
return f'Check Error: {details["code"]} {details["reason"]}'
if bool(details["is_possible"]) is False:
return "This generation is impossible."
if bool(details["faulted"]) is True:
return "This generation is faulted."
if bool(details["done"]) is True:
async with HordeAPI.generateStatus(id) as generation_detail:
if "generations" not in generation_detail:
return f'Status Error: {generation_detail["code"]} {generation_detail["reason"]}'
for i in range(len(generation_detail["generations"])):
return generation_detail["generations"][i]["img"]
if int(details["processing"]) > 0:
total = int(details["finished"])
+ int(details["processing"])
+ int(details["queue_position"])
+ int(details["restarted"])
+ int(details["waiting"])
return f'Processing image: {details["processing"]}/{total}'
return f'Position in queue: {details["queue_position"]}, wait time: {details["wait_time"]}s'
@bot.event
async def on_ready():
await tree.sync()
await bot.tree.sync()
print('We have logged in as {0.user}'.format(bot))
@bot.event
async def on_message(message):
if message.author == bot.user:
return
if "message" in json_data:
for rule in json_data["message"]:
rule_type = rule["type"]
content = rule["content"]
# 根据规则类型动态调用对应的判断函数
if check_functions.get(rule_type, lambda c, m: False)(content, message):
# 如果规则指定了函数,则调用对应的函数
if "function" in rule:
function_name = rule["function"]
result = eval(f"await {function_name}()")
await message.channel.send(result)
# 否则发送预定义的响应消息
elif "response" in rule:
await message.channel.send(rule["response"])
# 确保命令系统正常工作
await bot.process_commands(message)
async def sendMessageToChannelHelper(data):
channel = await bot.fetch_channel(os.environ.get("CHANNEL_ID"))
# 创建一个 embed 对象
mTitle = "Empty Title"
if "id" in data:
mTitle = data["id"]
if "log_tag" in data:
mTitle = data["log_tag"]
mDescription = "Empty Description"
if "model" in data:
mDescription = data["model"]
if "log_message" in data:
mDescription = data["log_message"]
mColor = 0x00ff00
if ("log_tag" in data or "log_message" in data) and (data["log_level"] == "assert" or data["log_level"] == "error"):
mColor = 0xff0000
embed = discord.Embed(title=mTitle, description=mDescription, color=mColor)
# 将 fields 数据加入 embed
for field in data:
if field == "img":
embed.set_image(url=data[field])
else:
embed.add_field(name=field, value=data[field], inline=True)
# 发送 embed 消息
await channel.send(embed=embed)
def sendMessageToChannel(data):
bot.loop.create_task(sendMessageToChannelHelper(data))
def run():
try:
token = json_data["token"] if "token" in json_data else (os.environ.get("TOKEN") or "")
if token == "":
raise Exception("Please add your token to the Secrets pane.")
bot.run(token)
except discord.HTTPException as e:
if e.status == 429:
print(
"The Discord servers denied the connection for making too many requests"
)
print(
"Get help from https://stackoverflow.com/questions/66724687/in-discord-py-how-to-solve-the-error-for-toomanyrequests"
)
else:
raise e
def discord_bot():
print("Running discord_bot")
run()
if __name__ == "__main__":
discord_bot()