nomen/nomen/main.py

295 lines
7.9 KiB
Python

import io
import logging
import os
import pprint
import sys
import textwrap
import traceback
from contextlib import redirect_stdout
from disnake import Guild, Intents, Message
from disnake.ext import commands
from disnake.ext.commands import Bot, Paginator
from dotenv import find_dotenv, load_dotenv
from .db import run_db_migrations, setup_db
from .notifications import Notifications
from .settings import Settings
from .utils import cleanup_code
# Logger setup
logger_disnake = logging.getLogger("disnake")
logger_disnake.setLevel(logging.WARNING)
log = logging.getLogger("nomen")
log.setLevel(logging.DEBUG)
formatter = logging.Formatter("{asctime} {levelname:8} {name}: {message}", "%Y-%m-%d %H:%M:%S", "{")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logging.getLogger(None).addHandler(handler)
if load_dotenv(find_dotenv(usecwd=True)):
log.debug("Loaded .env")
else:
log.debug("Didn't find .env")
TOKEN = os.getenv("TOKEN")
DB_FILE = os.getenv("DB_FILE") or "nomen.sqlite3"
DEFAULT_PREFIX = os.getenv("DEFAULT_PREFIX") or ">"
async def get_prefix(the_bot, message: Message):
if not message.guild:
return commands.when_mentioned_or(DEFAULT_PREFIX)(the_bot, message)
gp = await the_bot.get_guild_prefix(message.guild)
if gp == "":
return commands.when_mentioned(the_bot, message)
return commands.when_mentioned_or(gp)(the_bot, message)
class Nomen(Bot):
def __init__(self, description=None, **options):
super().__init__(
get_prefix,
description=description,
command_sync_flags=options.get("sync_flags"),
allowed_mentions=options.get("allowed_mentions"),
intents=options.get("intents"),
)
self.db = None # Setup in start
self.prefixes = {}
async def get_guild_prefix(self, guild: Guild):
if guild.id in self.prefixes:
return self.prefixes[guild.id]
cur = await self.db.execute(
"SELECT prefix FROM guilds WHERE guild_id=? LIMIT 1",
[guild.id],
)
prefix = await cur.fetchone() or [DEFAULT_PREFIX]
prefix = prefix[0]
self.prefixes[guild.id] = prefix
return prefix
async def set_guild_prefix(self, guild: Guild, prefix):
await self.db.execute("REPLACE INTO guilds (guild_id, prefix) VALUES(?, ?)", (guild.id, prefix))
self.prefixes[guild.id] = prefix
async def start(self, *args, **kwargs):
self.db = await setup_db(DB_FILE)
await super().start(*args, **kwargs)
async def close(self):
await super().close()
await self.db.close()
async def get_setting(self, user_id, setting):
return await self.db.fetch_singleton(f"SELECT {setting} FROM users WHERE user_id=?", (user_id,))
async def set_setting(self, user_id, setting, value: bool):
await self.db.execute(f"REPLACE INTO users (user_id, {setting}) VALUES(?, ?)", (user_id, value))
await self.db.commit()
bot = Nomen(
description="Keeper of Names",
intents=Intents(
guilds=True,
messages=True,
message_content=True,
),
sync_flags=commands.CommandSyncFlags(
sync_commands=False,
),
)
bot.add_cog(Notifications(bot))
bot.add_cog(Settings(bot))
@bot.event
async def on_ready():
log.info("Logged in as:")
log.info(bot.user)
log.info(bot.user.id)
log.info("-------------")
bot.app_info = await bot.application_info()
bot.owner = bot.app_info.owner
@bot.event
async def on_command_error(ctx, error):
if isinstance(error, commands.CommandNotFound):
return
if isinstance(
error,
(
commands.UserInputError,
commands.CheckFailure,
),
):
await ctx.send(error)
return
log.error(error, exc_info=error)
timestamp = int(ctx.message.created_at.timestamp())
paginator = Paginator()
ptraceback = "".join(traceback.format_exception(error))
for line in ptraceback.split("\n"):
paginator.add_line(line)
await ctx.bot.owner.send(
f"# Command Error Occurred: {error}\n"
f"## Context\n"
f"**From:** {ctx.author.mention} ({ctx.author.name})\n"
f"<t:{timestamp}>, <t:{timestamp}:R> | [Jump to Command]({ctx.message.jump_url}) | <#{ctx.channel.id}>\n"
f"## Traceback"
)
for page in paginator.pages:
await ctx.bot.owner.send(page)
await ctx.send(
f"Error: {error}\n\n**Note:** Infinidoge (Developer) has already been sent a notification that an error occurred and will come investigate when available."
)
@bot.event
async def on_error(event, *args, **kwargs):
pargs = pprint.pformat(args)
pkwargs = pprint.pformat(kwargs)
try:
paginator = Paginator()
for line in traceback.format_exc().split("\n"):
paginator.add_line(line)
await bot.owner.send(
f"# Error Occurred: Event `{event[:100]}`\n"
f"## args\n"
f"```\n{pargs}\n```\n"
f"## kwargs\n"
f"```\n{pkwargs}\n```\n"
f"## Traceback"
)
for page in paginator.pages:
await bot.owner.send(page)
except: # Don't want recursive errors
pass
log.error(f"Error in event {event}.\nargs={pargs}\nkwargs={pkwargs}", exc_info=sys.exc_info())
@bot.command(hidden=True)
@commands.is_owner()
async def echo(ctx, arg):
await ctx.send(arg)
@bot.command(hidden=True)
@commands.is_owner()
async def sql(ctx, *, query):
params = {
"guild_id": ctx.guild.id,
}
query = cleanup_code(query)
cur = await ctx.bot.db.execute(query)
result = await cur.fetchall()
# TODO: Finish SQL evaluation command
@bot.command(hidden=True, name="eval")
@commands.is_owner()
async def admin_eval(ctx, *, body: str):
env = {
"bot": ctx.bot,
"ctx": ctx,
"channel": ctx.channel,
"author": ctx.author,
"guild": ctx.guild,
"message": ctx.message,
"log": log.getChild("eval"),
}
env.update(globals())
body = cleanup_code(body)
stdout = io.StringIO()
to_compile = "async def func():\n{}".format(textwrap.indent(body, " "))
try:
exec(to_compile, env)
except Exception as e:
return await ctx.send("```py\n{}: {}\n```".format(e.__class__.__name__, e))
func = env["func"]
try:
with redirect_stdout(stdout):
ret = await func()
except Exception:
value = stdout.getvalue()
ret = traceback.format_exc()
try:
await ctx.message.add_reaction("\u2705")
except:
pass
else:
value = stdout.getvalue()
try:
await ctx.message.add_reaction("\u2705")
except:
pass
paginator = Paginator(prefix="```py", suffix="```")
if value:
for line in str(value).split("\n"):
paginator.add_line(line)
paginator.close_page()
if ret:
for line in str(ret).split("\n"):
paginator.add_line(line)
paginator.close_page()
for page in paginator.pages:
await ctx.send(page)
@bot.command()
async def ping(ctx):
await ctx.send("Pong")
@bot.command()
@commands.has_guild_permissions(manage_guild=True)
async def prefix(ctx, prefix=None):
if prefix is None:
prefix = await ctx.bot.get_guild_prefix(ctx.guild)
if prefix == "":
await ctx.send(f"{ctx.guild.name} prefix is mentioning me only")
else:
await ctx.send(f"{ctx.guild.name} prefix is `{prefix}`")
else:
await ctx.bot.set_guild_prefix(ctx.guild, prefix)
if prefix == "":
await ctx.send(f"Set {ctx.guild.name} prefix to mentioning me only")
else:
await ctx.send(f"Set {ctx.guild.name} prefix to `{prefix}`")
def run():
try:
run_db_migrations(DB_FILE)
except RuntimeError:
pass
else:
bot.run(TOKEN)