init 2
This commit is contained in:
parent
fd9381e746
commit
2df2a08ded
7 changed files with 580 additions and 34 deletions
4
.gitignore
vendored
4
.gitignore
vendored
|
@ -8,3 +8,7 @@ result
|
|||
# Python
|
||||
.ruff_cache
|
||||
.ropeproject
|
||||
|
||||
# Runtime
|
||||
nomen.sqlite3
|
||||
nomen.sqlite3-journal
|
||||
|
|
43
nomen/db.py
Normal file
43
nomen/db.py
Normal file
|
@ -0,0 +1,43 @@
|
|||
import aiosqlite
|
||||
|
||||
from .utils import contains
|
||||
|
||||
|
||||
async def setup_db(db_file):
|
||||
db = await aiosqlite.connect(db_file)
|
||||
|
||||
await db.executescript("""
|
||||
BEGIN;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS keywords (
|
||||
guild_id INTEGER NOT NULL,
|
||||
keyword TEXT NOT NULL,
|
||||
user_id INTEGER NOT NULL,
|
||||
regex INTEGER NOT NULL DEFAULT 0 CHECK(regex IN (0, 1)),
|
||||
count INTEGER NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (guild_id, keyword, user_id)
|
||||
)
|
||||
WITHOUT ROWID;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS guilds (
|
||||
guild_id INTEGER NOT NULL PRIMARY KEY,
|
||||
prefix TEXT NOT NULL DEFAULT ">"
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS users (
|
||||
user_id INTEGER NOT NULL PRIMARY KEY,
|
||||
disabled INTEGER NOT NULL DEFAULT 0 CHECK(disabled IN (0, 1))
|
||||
);
|
||||
|
||||
COMMIT;
|
||||
|
||||
PRAGMA optimize(0x10002);
|
||||
PRAGMA main.synchronous = NORMAL;
|
||||
""")
|
||||
|
||||
await db.create_function("contains", 3, contains, deterministic=True)
|
||||
|
||||
return db
|
||||
|
||||
|
||||
# TODO: Database versioning and migrations
|
|
@ -2,15 +2,18 @@ import logging
|
|||
import os
|
||||
import sys
|
||||
|
||||
import aiosqlite as sqlite
|
||||
from disnake import Guild, Intents, Message
|
||||
from disnake.ext import commands
|
||||
from disnake.ext.commands import Bot
|
||||
from dotenv import find_dotenv, load_dotenv
|
||||
|
||||
from .db import setup_db
|
||||
from .notifications import Notifications
|
||||
from .settings import Settings
|
||||
|
||||
# Logger setup
|
||||
logger_disnake = logging.getLogger("disnake")
|
||||
logger_disnake.setLevel(logging.INFO)
|
||||
logger_disnake.setLevel(logging.WARNING)
|
||||
|
||||
log = logging.getLogger("nomen")
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
@ -22,18 +25,24 @@ logger_disnake.addHandler(handler)
|
|||
log.addHandler(handler)
|
||||
|
||||
if load_dotenv(find_dotenv(usecwd=True)):
|
||||
log.info("Loaded .env")
|
||||
log.debug("Loaded .env")
|
||||
else:
|
||||
log.debug("Didn't find .env")
|
||||
|
||||
TOKEN = os.getenv("TOKEN")
|
||||
DB_FILE = os.getenv("DB_FILE") or "nomen.sqlite3"
|
||||
DEFAULT_PREFIX = os.getenv("DEFAULT_PREFIX") or ">"
|
||||
|
||||
|
||||
async def get_prefix(the_bot, message: Message):
|
||||
if not message.guild:
|
||||
return commands.when_mentioned_or(DEFAULT_PREFIX)(the_bot, message)
|
||||
|
||||
gp = await the_bot.get_guild_prefix(message.guild)
|
||||
|
||||
if gp == "":
|
||||
return commands.when_mentioned(the_bot, message)
|
||||
|
||||
return commands.when_mentioned_or(gp)(the_bot, message)
|
||||
|
||||
|
||||
|
@ -47,19 +56,28 @@ class Nomen(Bot):
|
|||
intents=options.get("intents"),
|
||||
)
|
||||
|
||||
# self.db = self.loop.run_until_complete(sqlite.connect("nomen.db"))
|
||||
self.db = self.loop.run_until_complete(setup_db(DB_FILE))
|
||||
self.prefixes = {}
|
||||
|
||||
async def get_guild_prefix(self, guild: Guild):
|
||||
if guild.id in self.prefixes:
|
||||
return self.prefixes[guild.id]
|
||||
# FIXME: Fetch with SQLite
|
||||
|
||||
# self.prefixes[guild.id] = prefix
|
||||
return DEFAULT_PREFIX
|
||||
cur = await self.db.execute(
|
||||
"SELECT prefix FROM guilds WHERE guild_id=? LIMIT 1",
|
||||
[guild.id],
|
||||
)
|
||||
prefix = await cur.fetchone() or [DEFAULT_PREFIX]
|
||||
prefix = prefix[0]
|
||||
|
||||
self.prefixes[guild.id] = prefix
|
||||
return prefix
|
||||
|
||||
async def set_guild_prefix(self, guild: Guild, prefix):
|
||||
# FIXME: Fetch with SQLite
|
||||
await self.db.execute(
|
||||
"REPLACE INTO guilds VALUES(?, ?)",
|
||||
(guild.id, prefix),
|
||||
)
|
||||
self.prefixes[guild.id] = prefix
|
||||
|
||||
async def close(self):
|
||||
|
@ -80,6 +98,9 @@ bot = Nomen(
|
|||
),
|
||||
)
|
||||
|
||||
bot.add_cog(Notifications(bot))
|
||||
bot.add_cog(Settings(bot))
|
||||
|
||||
|
||||
@bot.event
|
||||
async def on_ready():
|
||||
|
@ -94,25 +115,16 @@ async def on_command_error(ctx, error):
|
|||
if isinstance(error, commands.CommandNotFound):
|
||||
return
|
||||
|
||||
|
||||
async def handle_triggers(ctx, message):
|
||||
log.debug(f"Handling triggers for message '{ctx.message.content}'")
|
||||
|
||||
|
||||
@bot.event
|
||||
async def on_message(message):
|
||||
if message.author == bot.user:
|
||||
if isinstance(error, commands.NoPrivateMessage):
|
||||
await ctx.send(error)
|
||||
return
|
||||
|
||||
ctx = await bot.get_context(message)
|
||||
if ctx.valid:
|
||||
await bot.invoke(ctx)
|
||||
else:
|
||||
await handle_triggers(ctx, message)
|
||||
log.exception(error)
|
||||
await ctx.send(error)
|
||||
|
||||
|
||||
@bot.command()
|
||||
@commands.check(commands.is_owner())
|
||||
@commands.is_owner()
|
||||
async def echo(ctx, arg):
|
||||
await ctx.send(arg)
|
||||
|
||||
|
@ -123,13 +135,20 @@ async def ping(ctx):
|
|||
|
||||
|
||||
@bot.command()
|
||||
@commands.check(commands.guild_only())
|
||||
@commands.guild_only()
|
||||
@commands.is_owner()
|
||||
async def prefix(ctx, prefix=None):
|
||||
if not prefix:
|
||||
if prefix is None:
|
||||
prefix = await ctx.bot.get_guild_prefix(ctx.guild)
|
||||
if prefix == "":
|
||||
await ctx.send(f"{ctx.guild.name} prefix is mentioning me only")
|
||||
else:
|
||||
await ctx.send(f"{ctx.guild.name} prefix is `{prefix}`")
|
||||
else:
|
||||
await ctx.bot.set_guild_prefix(ctx.guild, prefix)
|
||||
if prefix == "":
|
||||
await ctx.send(f"Set {ctx.guild.name} prefix to mentioning me only")
|
||||
else:
|
||||
await ctx.send(f"Set {ctx.guild.name} prefix to `{prefix}`")
|
||||
|
||||
|
||||
|
|
|
@ -1,15 +1,237 @@
|
|||
from discord.ext.commands import Cog, command
|
||||
import logging
|
||||
from asyncio import TaskGroup
|
||||
|
||||
from disnake import Embed
|
||||
from disnake.ext.commands import Cog, group, guild_only
|
||||
|
||||
from .utils import can_view, confirm, test_keyword, unpack
|
||||
|
||||
log = logging.getLogger("nomen.notifications")
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
|
||||
async def handle_notification(db_updates, ctx, message, keyword, user_id):
|
||||
member = await ctx.guild.getch_member(user_id)
|
||||
|
||||
if not await can_view(ctx, member):
|
||||
log.debug(f"- - Missing permission {user_id}")
|
||||
return
|
||||
|
||||
log.debug(f"- - Notifying {user_id}")
|
||||
db_updates.append((ctx.guild.id, keyword, user_id))
|
||||
|
||||
footer = f"\n\n<t:{int(message.created_at.timestamp())}:R> | [Show]({message.jump_url}) | <#{ctx.channel.id}>"
|
||||
|
||||
embed = Embed(
|
||||
description=message.content + footer,
|
||||
)
|
||||
embed.set_author(name=message.author, icon_url=message.author.display_avatar)
|
||||
|
||||
await member.send(
|
||||
f":bell: `{message.author}` mentioned `{keyword}` on `{ctx.guild}`:",
|
||||
embed=embed,
|
||||
)
|
||||
|
||||
|
||||
async def handle_triggers(ctx, message):
|
||||
cur = await ctx.bot.db.execute("SELECT disabled FROM users WHERE user_id=?", (ctx.author.id,))
|
||||
res = await cur.fetchone()
|
||||
|
||||
if res and res[0]:
|
||||
log.debug(f"User {ctx.author} ({ctx.author.id}) opted out")
|
||||
return
|
||||
|
||||
handled_users = [ctx.author.id]
|
||||
db_updates = []
|
||||
|
||||
async with TaskGroup() as tg:
|
||||
async with ctx.bot.db.execute(
|
||||
"""SELECT keyword, user_id
|
||||
FROM keywords LEFT JOIN users USING (user_id)
|
||||
WHERE disabled IS NOT 1 AND guild_id=? AND contains(?, keyword, regex)""",
|
||||
(ctx.guild.id, message.content),
|
||||
) as cur:
|
||||
async for keyword, user_id in cur:
|
||||
log.debug(f"- Handling '{keyword}' for {user_id}")
|
||||
|
||||
if user_id in handled_users:
|
||||
log.debug(f"- - Ignoring {user_id}")
|
||||
continue
|
||||
|
||||
handled_users.append(user_id)
|
||||
|
||||
tg.create_task(handle_notification(db_updates, ctx, message, keyword, user_id))
|
||||
|
||||
await ctx.bot.db.executemany(
|
||||
"UPDATE keywords SET count = count + 1 WHERE guild_id=? AND keyword=? AND user_id=?",
|
||||
db_updates,
|
||||
)
|
||||
await ctx.bot.db.commit()
|
||||
|
||||
class User:
|
||||
def __init__(self, ...):
|
||||
pass
|
||||
|
||||
class Notifications(Cog):
|
||||
"""
|
||||
Notifications!
|
||||
"""
|
||||
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
|
||||
@Cog.listener()
|
||||
async def on_message(self, message):
|
||||
ctx = self.bot.get_context(message)
|
||||
if ctx.valid: return
|
||||
await handle_triggers(message)
|
||||
if message.author == self.bot.user:
|
||||
return
|
||||
|
||||
ctx = await self.bot.get_context(message)
|
||||
if ctx.valid or ctx.guild is None:
|
||||
return
|
||||
|
||||
await handle_triggers(ctx, message)
|
||||
|
||||
@group(
|
||||
aliases=["kw", "notifications", "notif", "noti"],
|
||||
invoke_without_command=True,
|
||||
)
|
||||
async def keyword(self, ctx):
|
||||
"""
|
||||
Notification keywords
|
||||
"""
|
||||
await ctx.send_help(self.keyword)
|
||||
|
||||
async def _add_keyword(self, ctx, keyword, regex):
|
||||
log.debug(
|
||||
f"Adding {'regex' if regex else 'keyword'}: {keyword} of {ctx.author} ({ctx.author.id}) on {ctx.guild} ({ctx.guild.id})"
|
||||
)
|
||||
|
||||
if test_keyword(keyword, regex):
|
||||
await ctx.send(f"{'Regex' if regex else 'Keyword'} matches a word that is too common")
|
||||
return
|
||||
|
||||
cur = await ctx.bot.db.execute(
|
||||
"SELECT keyword FROM keywords WHERE guild_id=? AND user_id=? AND contains(?, keyword, regex)",
|
||||
(ctx.guild.id, ctx.author.id, keyword),
|
||||
)
|
||||
|
||||
conflicts = unpack(await cur.fetchall())
|
||||
|
||||
if conflicts:
|
||||
await ctx.send(f"Any instance of keyword `{keyword}` would be matched by existing keywords (check DMs)")
|
||||
await ctx.author.send(
|
||||
f"Conflicts with keyword `{keyword}`:\n" + "\n".join(f"- `{conflict}`" for conflict in conflicts)
|
||||
)
|
||||
return
|
||||
|
||||
await ctx.bot.db.execute(
|
||||
"INSERT INTO keywords (guild_id, keyword, user_id, regex) VALUES (?, ?, ?, ?)",
|
||||
(ctx.guild.id, keyword, ctx.author.id, regex),
|
||||
)
|
||||
await ctx.bot.db.commit()
|
||||
await ctx.send(f"Added `{keyword}` to your list of keywords")
|
||||
|
||||
@keyword.command()
|
||||
@guild_only()
|
||||
async def add(self, ctx, keyword):
|
||||
"""
|
||||
Adds a notification keyword
|
||||
|
||||
Use quotes for a keyword with spaces!
|
||||
"""
|
||||
|
||||
await self._add_keyword(ctx, keyword, False)
|
||||
|
||||
@keyword.command()
|
||||
@guild_only()
|
||||
async def regex(self, ctx, keyword):
|
||||
"""
|
||||
Adds a notification regex
|
||||
|
||||
Use quotes for a regex with spaces!
|
||||
"""
|
||||
|
||||
await self._add_keyword(ctx, keyword, True)
|
||||
|
||||
@keyword.command()
|
||||
@guild_only()
|
||||
async def remove(self, ctx, keyword):
|
||||
"""
|
||||
Removes a keyword or regex
|
||||
|
||||
Must be identical to the keyword or regex you intend on removing
|
||||
"""
|
||||
|
||||
log.debug(f"Removing keyword: {keyword} of {ctx.author} (ctx.author.id) on {ctx.guild} ({ctx.guild.id})")
|
||||
await ctx.bot.db.execute(
|
||||
"DELETE FROM keywords WHERE guild_id=? AND keyword=? AND user_id=?",
|
||||
(ctx.guild.id, keyword, ctx.author.id),
|
||||
)
|
||||
await ctx.bot.db.commit()
|
||||
await ctx.send(f"Removed `{keyword}` from your list of keywords")
|
||||
|
||||
@keyword.command()
|
||||
@guild_only()
|
||||
async def list(self, ctx):
|
||||
"""
|
||||
Lists keywords and regexes, with trigger count
|
||||
"""
|
||||
|
||||
log.debug(f"Listing keywords: {ctx.author} ({ctx.author.id}) on {ctx.guild} ({ctx.guild.id})")
|
||||
|
||||
async with ctx.bot.db.execute(
|
||||
"SELECT keyword, count, regex FROM keywords WHERE guild_id=? AND user_id=?",
|
||||
(ctx.guild.id, ctx.author.id),
|
||||
) as cur:
|
||||
keywords = await cur.fetchall()
|
||||
|
||||
embed = Embed(
|
||||
description="\n".join(
|
||||
f"{'regex '*regex}`{keyword}`: notified `{count}` times" for keyword, count, regex in keywords
|
||||
)
|
||||
)
|
||||
embed.set_author(name=f"Your keywords on {ctx.guild}:", icon_url=ctx.guild.icon)
|
||||
|
||||
await ctx.author.send(embed=embed)
|
||||
await ctx.send("Please check your direct messages")
|
||||
|
||||
@keyword.command()
|
||||
@guild_only()
|
||||
async def clear(self, ctx):
|
||||
"""
|
||||
Clears keywords and regexes for this server
|
||||
"""
|
||||
|
||||
log.debug(f"Clearing keywords: {ctx.author} ({ctx.author.id}) on {ctx.guild} ({ctx.guild.id})")
|
||||
|
||||
to_clear = await confirm(
|
||||
ctx, "Are you sure you want to clear your keywords on {ctx.guild}? (Reply with yes/no)", True
|
||||
)
|
||||
|
||||
if to_clear is None:
|
||||
return await ctx.send("Timed out or received an invalid response", delete_after=10)
|
||||
elif not to_clear:
|
||||
return await ctx.send("Canceled", delete_after=10)
|
||||
|
||||
await ctx.send("Clearing keywords")
|
||||
|
||||
await ctx.bot.db.execute(
|
||||
"DELETE FROM KEYWORDS WHERE guild_id=? AND user_id=?",
|
||||
(ctx.guild.id, ctx.author.id),
|
||||
)
|
||||
await ctx.bot.db.commit()
|
||||
|
||||
@keyword.command()
|
||||
@guild_only()
|
||||
async def pause(self, ctx):
|
||||
pass
|
||||
# TODO: Pause guild notifications
|
||||
|
||||
@keyword.group(invoke_without_command=True)
|
||||
@guild_only()
|
||||
async def ignore(self, ctx, channel):
|
||||
pass
|
||||
# TODO: Ignore channels
|
||||
|
||||
@ignore.cmmand()
|
||||
@guild_only()
|
||||
async def active(self, ctx):
|
||||
pass
|
||||
# TODO: Ignore active channel
|
||||
|
|
76
nomen/settings.py
Normal file
76
nomen/settings.py
Normal file
|
@ -0,0 +1,76 @@
|
|||
import logging
|
||||
|
||||
from disnake.ext.commands import Cog, group, guild_only
|
||||
|
||||
from .utils import confirm
|
||||
|
||||
log = logging.getLogger("nomen.settings")
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class Settings(Cog):
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
|
||||
@group(invoke_without_command=True)
|
||||
async def nomen(self, ctx):
|
||||
"""
|
||||
Settings for Nomen
|
||||
"""
|
||||
|
||||
await ctx.send_help(self.nomen)
|
||||
|
||||
@nomen.command()
|
||||
async def purge(self, ctx):
|
||||
"""
|
||||
Deletes all user data stored in Nomen
|
||||
"""
|
||||
pass
|
||||
|
||||
@nomen.command(name="import")
|
||||
@guild_only()
|
||||
async def _import(self, ctx):
|
||||
"""
|
||||
Imports a CSV of all of your user data
|
||||
"""
|
||||
pass
|
||||
|
||||
@nomen.command()
|
||||
@guild_only()
|
||||
async def export(self, ctx):
|
||||
"""
|
||||
Exports a CSV of all of your user data
|
||||
"""
|
||||
pass
|
||||
|
||||
@nomen.command(name="opt-out")
|
||||
async def opt_out(self, ctx):
|
||||
"""
|
||||
Opt-out of Nomen processing your messages entirely
|
||||
|
||||
You will not trigger anyone else's notifications, and you will not receive any notifications
|
||||
"""
|
||||
|
||||
log.debug(f"Opting-out: {ctx.author} ({ctx.author.id})")
|
||||
|
||||
await ctx.send(
|
||||
f"You have now opted-out and will no longer trigger or receive notifications. To opt back in, run `{ctx.clean_prefix}nomen opt-in`"
|
||||
)
|
||||
|
||||
await self.bot.db.execute("REPLACE INTO users VALUES(?, 1)", (ctx.author.id,))
|
||||
await self.bot.db.commit()
|
||||
|
||||
@nomen.command(name="opt-in")
|
||||
async def opt_in(self, ctx):
|
||||
"""
|
||||
Opt-in to Nomen processing your messages
|
||||
"""
|
||||
|
||||
log.debug(f"Opting-in: {ctx.author} ({ctx.author.id})")
|
||||
|
||||
await ctx.send(
|
||||
f"You have opted back in and will now trigger and receive notifications. To opt out, run `{ctx.clean_prefix}nomen opt-out`"
|
||||
)
|
||||
|
||||
await self.bot.db.execute("REPLACE INTO users VALUES(?, 0)", (ctx.author.id,))
|
||||
await self.bot.db.commit()
|
178
nomen/utils.py
Normal file
178
nomen/utils.py
Normal file
|
@ -0,0 +1,178 @@
|
|||
from asyncio import TimeoutError
|
||||
from itertools import chain
|
||||
|
||||
import re2 as re
|
||||
from disnake import ChannelType
|
||||
|
||||
ALPHABET = list("abcdefghijklmnopqrstuvwxyz")
|
||||
|
||||
COMMON_WORDS = [
|
||||
# common words
|
||||
"of",
|
||||
"in",
|
||||
"is",
|
||||
"to",
|
||||
"it",
|
||||
"as",
|
||||
"on",
|
||||
"by",
|
||||
"or",
|
||||
"be",
|
||||
"an",
|
||||
"at",
|
||||
"if",
|
||||
"up",
|
||||
"so",
|
||||
"do",
|
||||
"th",
|
||||
"no",
|
||||
"de",
|
||||
"the",
|
||||
"and",
|
||||
"was",
|
||||
"for",
|
||||
"that",
|
||||
"are",
|
||||
"with",
|
||||
"from",
|
||||
"this",
|
||||
"not",
|
||||
"also",
|
||||
"has",
|
||||
"were",
|
||||
"which",
|
||||
"have",
|
||||
"people",
|
||||
"one",
|
||||
"can",
|
||||
# pronouns
|
||||
"you",
|
||||
"your",
|
||||
"yours",
|
||||
"yourself",
|
||||
"he",
|
||||
"him",
|
||||
"his",
|
||||
"himself",
|
||||
"she",
|
||||
"her",
|
||||
"hers",
|
||||
"herself",
|
||||
"they",
|
||||
"them",
|
||||
"theirs",
|
||||
"themself",
|
||||
"themselves",
|
||||
]
|
||||
|
||||
TESTS = list(
|
||||
chain(
|
||||
ALPHABET, # single letters
|
||||
COMMON_WORDS,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
regex_cache = {}
|
||||
|
||||
|
||||
def compile_keyword(keyword, regex):
|
||||
if reg := regex_cache.get((keyword, regex), None):
|
||||
return reg
|
||||
|
||||
if not regex:
|
||||
keyword = re.escape(keyword)
|
||||
|
||||
reg = re.compile(rf"(?i)\b{keyword}\b")
|
||||
regex_cache[(keyword, regex)] = reg
|
||||
return reg
|
||||
|
||||
|
||||
def contains(string, keyword, regex):
|
||||
return compile_keyword(keyword, regex).search(string) is not None
|
||||
|
||||
|
||||
def test_keyword(keyword, regex):
|
||||
reg = compile_keyword(keyword, regex)
|
||||
|
||||
return any(map(lambda x: reg.search(x) is not None, TESTS))
|
||||
|
||||
|
||||
def first(tpl):
|
||||
return tpl[0]
|
||||
|
||||
|
||||
def unpack(lst_of_tpl):
|
||||
"""Takes a list of tuples and maps to a list of their first elements"""
|
||||
return list(map(first, lst_of_tpl))
|
||||
|
||||
|
||||
async def in_thread(member, thread):
|
||||
# FIXME: Currently overlooks the situation where a moderator isn't in a thread but has manage threads
|
||||
return any(member.id == thread_member.id for thread_member in await thread.fetch_members())
|
||||
|
||||
|
||||
async def can_view(ctx, member) -> bool:
|
||||
if ctx.channel.type == ChannelType.private_thread and not in_thread(member, ctx.channel):
|
||||
return False
|
||||
|
||||
return ctx.channel.permissions_for(member).view_channel
|
||||
|
||||
|
||||
# ===== Borrowed from https://github.com/avrae/avrae, GPLv3 licensed =====
|
||||
|
||||
|
||||
def list_get(index, default, l):
|
||||
try:
|
||||
return l[index]
|
||||
except IndexError:
|
||||
return default
|
||||
|
||||
|
||||
def get_positivity(string):
|
||||
if isinstance(string, bool): # oi!
|
||||
return string
|
||||
lowered = string.lower()
|
||||
if lowered in ("yes", "y", "true", "t", "1", "enable", "on"):
|
||||
return True
|
||||
elif lowered in ("no", "n", "false", "f", "0", "disable", "off"):
|
||||
return False
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def auth_and_chan(ctx):
|
||||
def check(message):
|
||||
return message.author == ctx.author and message.channel == ctx.channel
|
||||
|
||||
return check
|
||||
|
||||
|
||||
async def confirm(ctx, message, delete_msgs=False, response_check=get_positivity):
|
||||
"""
|
||||
Confirms whether a user wants to take an action.
|
||||
|
||||
:rtype: bool|None
|
||||
:param ctx: The current Context.
|
||||
:param message: The message for the user to confirm.
|
||||
:param delete_msgs: Whether to delete the messages.
|
||||
:param response_check: A function (str) -> bool that returns whether a given reply is a valid response.
|
||||
:type response_check: (str) -> bool
|
||||
:return: Whether the user confirmed or not. None if no reply was recieved
|
||||
"""
|
||||
msg = await ctx.channel.send(message)
|
||||
try:
|
||||
reply = await ctx.bot.wait_for("message", timeout=30, check=auth_and_chan(ctx))
|
||||
except TimeoutError:
|
||||
return None
|
||||
reply_bool = response_check(reply.content) if reply is not None else None
|
||||
if delete_msgs:
|
||||
try:
|
||||
await msg.delete()
|
||||
await reply.delete()
|
||||
except:
|
||||
pass
|
||||
return reply_bool
|
||||
|
||||
|
||||
# ===== End code borrowed from Avrae =====
|
|
@ -4,7 +4,8 @@ version = "0.0.1"
|
|||
dependencies = [
|
||||
"disnake",
|
||||
"python-dotenv",
|
||||
"aiosqlite"
|
||||
"aiosqlite",
|
||||
"google-re2"
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
|
@ -16,3 +17,6 @@ find = {}
|
|||
[build-system]
|
||||
requires = ["setuptools"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 120
|
||||
|
|
Loading…
Reference in a new issue