Spaces:
Paused
Paused
| import asyncio | |
| import os | |
| import threading | |
| from threading import Event | |
| from typing import Optional | |
| import datetime | |
| import requests | |
| import discord | |
| import gradio as gr | |
| import gradio_client as grc | |
| from discord import Permissions | |
| from discord.ext import commands | |
| from discord.utils import oauth_url | |
| from gradio_client.utils import QueueError | |
| event = Event() | |
| DISCORD_TOKEN = os.getenv("DISCORD_TOKEN") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| async def wait(job): | |
| while not job.done(): | |
| await asyncio.sleep(0.2) | |
| intents = discord.Intents.all() | |
| bot = commands.Bot(command_prefix="$", intents=intents, help_command=None) | |
| async def uptime(ctx): | |
| """Displays the uptime of the bot.""" | |
| delta = datetime.datetime.utcnow() - bot.start_time | |
| hours, remainder = divmod(int(delta.total_seconds()), 3600) | |
| minutes, seconds = divmod(remainder, 60) | |
| days, hours = divmod(hours, 24) | |
| # Create a fancy embed with emojis | |
| embed = discord.Embed(title="Bot Uptime", color=discord.Color.green()) | |
| embed.add_field(name="Uptime", value=f"{days} days, {hours} hours, {minutes} minutes, {seconds} seconds", inline=False) | |
| embed.set_footer(text="Created by Cosmos") | |
| await ctx.send(embed=embed) | |
| async def ai(ctx, *, input_text: str): | |
| """Ask our AI model a question. (Session resets every 1 message!)"""\ | |
| try: | |
| client = Client("https://wop-xxx-opengpt.hf.space/") | |
| result = client.predict( | |
| input_text, | |
| 0.9, | |
| 1800, | |
| 0.9, | |
| 1.2, | |
| api_name="/chat" | |
| ) | |
| result = result[:-4] | |
| # Create an embed with the AI's response | |
| embed = discord.Embed(title="AI Response", description=result, color=discord.Color.green()) | |
| embed.add_field(name="Info", value="Session resets every `1` message!\nModel Name: `Mistral 7B`", inline=False) | |
| embed.set_footer(text="Created by Cosmos") | |
| # Reply with the embed | |
| await ctx.send(embed=embed) | |
| except Exception as e: | |
| await ctx.send(f"An error occurred: {str(e)}") | |
| async def verse(ctx): | |
| """Returns a random Bible verse.""" | |
| # Fetch a random Bible verse | |
| bible_api_url = "https://labs.bible.org/api/?passage=random&type=json" | |
| response = requests.get(bible_api_url) | |
| if response.status_code == 200: | |
| verse = response.json()[0] | |
| passage = f"**{verse['bookname']} {verse['chapter']}:{verse['verse']}** \n{verse['text']}" | |
| else: | |
| passage = "Unable to fetch Bible verse" | |
| # Create an embed | |
| embed = discord.Embed(title="Random Bible Verse", description=passage, color=discord.Color.blue()) | |
| embed.set_footer(text="Created by Cosmos") | |
| await ctx.send(embed=embed) | |
| async def kick(ctx, member: discord.Member, *, reason: Optional[str] = "No reason provided"): | |
| """Kicks a member.""" | |
| try: | |
| await member.kick(reason=reason) | |
| await ctx.send(f"{member.mention} has been kicked. Reason: {reason}") | |
| except discord.Forbidden: | |
| await ctx.send("I don't have permission to kick members.") | |
| except discord.HTTPException: | |
| await ctx.send("An error occurred while trying to kick the member. Please try again later.") | |
| async def ban(ctx, member: discord.Member, *, reason: Optional[str] = "No reason provided"): | |
| """Bans a member.""" | |
| try: | |
| await member.ban(reason=reason) | |
| await ctx.send(f"{member.mention} has been banned. Reason: {reason}") | |
| except discord.Forbidden: | |
| await ctx.send("I don't have permission to ban members.") | |
| except discord.HTTPException: | |
| await ctx.send("An error occurred while trying to ban the member. Please try again later.") | |
| async def cmds(ctx): | |
| """Returns a list of commands and bot information.""" | |
| # Get list of commands | |
| command_list = [f"{command.name}: {command.help}" for command in bot.commands] | |
| # Get bot information | |
| bot_info = f"Bot Name: {bot.user.name}\nBot ID: {bot.user.id}" | |
| # Create an embed | |
| embed = discord.Embed(title="Bot prefix: $", color=discord.Color.blue()) | |
| embed.add_field(name="Commands", value="\n".join(command_list), inline=False) | |
| embed.add_field(name="Bot Information", value=bot_info, inline=False) | |
| embed.set_footer(text="Created by Cosmos") | |
| await ctx.send(embed=embed) | |
| async def update_status(): | |
| await bot.wait_until_ready() # Wait until the bot is fully ready | |
| while True: | |
| # Fetch the number of members in all guilds the bot is connected to | |
| member_count = sum(guild.member_count for guild in bot.guilds) | |
| # Update the bot's status | |
| await bot.change_presence(activity=discord.Activity(type=discord.ActivityType.watching, name=f"{member_count} users")) | |
| # Wait for 60 seconds before updating again | |
| await asyncio.sleep(60) | |
| async def on_ready(): | |
| bot.start_time = datetime.datetime.utcnow() | |
| print(f"Logged in as {bot.user} (ID: {bot.user.id})") | |
| event.set() | |
| print("------") | |
| bot.loop.create_task(update_status()) # Create the task within the on_ready event | |
| async def on_member_join(member): | |
| channel = discord.utils.get(member.guild.channels, name="👋wellcome-goodbye") | |
| # Fetch a random Bible verse | |
| bible_api_url = "https://labs.bible.org/api/?passage=random&type=json" | |
| response = requests.get(bible_api_url) | |
| if response.status_code == 200: | |
| verse = response.json()[0] | |
| passage = f"{verse['bookname']} {verse['chapter']}:{verse['verse']} - {verse['text']}" | |
| else: | |
| passage = "Unable to fetch Bible verse" | |
| # Create an embed | |
| embed = discord.Embed(title=f"Welcome to the server, {member.name}!", description=f"Hope you have a great stay here, <@{member.id}>!", color=discord.Color.blue()) | |
| embed.add_field(name="Random Bible Verse", value=passage, inline=False) | |
| embed.set_footer(text="Created by Cosmos") | |
| await channel.send(embed=embed) | |
| async def search(ctx, *, query: str): | |
| """Search for a bible verse.""" | |
| bible_api_url = f"https://labs.bible.org/api/?passage={query}&type=json" | |
| response = requests.get(bible_api_url) | |
| if response.status_code == 200: | |
| verses = response.json() | |
| if verses: | |
| # If verses are found, concatenate them into a single message | |
| passage = "\n".join(f"**{verse['bookname']} {verse['chapter']}:{verse['verse']}** \n{verse['text']}" for verse in verses) | |
| passage = truncate_response(passage) | |
| embed = discord.Embed(title=f"Search Results for '{query}'", description=passage, color=discord.Color.blue()) | |
| else: | |
| embed = discord.Embed(title="Search Results", description="No results found.", color=discord.Color.red()) | |
| else: | |
| embed = discord.Embed(title="Search Results", description="Unable to fetch search results.", color=discord.Color.red()) | |
| embed.set_footer(text="Created by Cosmos") | |
| await ctx.send(embed=embed) | |
| async def purge(ctx, limit: int): | |
| """Removes N amount of messages.""" | |
| try: | |
| await ctx.channel.purge(limit=limit + 1) # Add 1 to include the command message itself | |
| await ctx.send(f"{limit} messages have been purged.") | |
| except discord.Forbidden: | |
| await ctx.send("I don't have permission to manage messages.") | |
| except discord.HTTPException: | |
| await ctx.send("An error occurred while trying to purge messages. Please try again later.") | |
| async def on_command_error(ctx, error): | |
| if isinstance(error, commands.CommandNotFound): | |
| await ctx.send("Sorry, I couldn't find that command. Use `$cmds` to see the list of available commands.") | |
| elif isinstance(error, commands.MissingRequiredArgument): | |
| await ctx.send("Oops! Looks like you're missing some required arguments.") | |
| elif isinstance(error, commands.CheckFailure): | |
| await ctx.send("You do not have the permissions to execute this command.") | |
| else: | |
| # Log the error to console or your logging system | |
| print(f"An error occurred: {error}") | |
| # Additional error handling for unexpected errors | |
| async def on_error(event_method, *args, **kwargs): | |
| # Log the error to console or your logging system | |
| print(f"An error occurred in {event_method}: {sys.exc_info()}") | |
| # Error handling for unhandled exceptions | |
| async def on_command_error(ctx, error): | |
| if isinstance(error, commands.CommandInvokeError): | |
| original_error = error.original | |
| if isinstance(original_error, discord.Forbidden): | |
| await ctx.send("I don't have permissions to do that.") | |
| elif isinstance(original_error, discord.HTTPException): | |
| await ctx.send("An error occurred while processing the command. Please try again later.") | |
| else: | |
| # Log the error to console or your logging system | |
| print(f"Error: {original_error}") | |
| await ctx.send("An unexpected error occurred.") | |
| # running in thread | |
| def run_bot(): | |
| if not DISCORD_TOKEN: | |
| print("DISCORD_TOKEN NOT SET") | |
| event.set() | |
| else: | |
| bot.run(DISCORD_TOKEN) | |
| threading.Thread(target=run_bot).start() | |
| event.wait() | |
| with gr.Blocks() as demo: | |
| gr.Markdown( | |
| f""" | |
| # Discord bot is online! | |
| """ | |
| ) | |
| demo.launch() |