nomen/nomen/utils.py

204 lines
4.4 KiB
Python

import logging
from asyncio import TimeoutError
from itertools import chain
import re2 as re
from disnake import ChannelType
log = logging.getLogger("nomen.utils")
ALPHABET = list("abcdefghijklmnopqrstuvwxyz ")
COMMON_WORDS = [
# common words
"of",
"in",
"is",
"to",
"it",
"as",
"on",
"by",
"or",
"be",
"an",
"at",
"if",
"up",
"so",
"do",
"th",
"no",
"de",
"the",
"and",
"was",
"for",
"that",
"are",
"with",
"from",
"this",
"not",
"also",
"has",
"were",
"which",
"have",
"people",
"one",
"can",
# pronouns
"you",
"your",
"yours",
"yourself",
"he",
"him",
"his",
"himself",
"she",
"her",
"hers",
"herself",
"they",
"them",
"theirs",
"themself",
"themselves",
"a" * 100,
]
TESTS = list(
chain(
ALPHABET, # single letters
COMMON_WORDS,
)
)
regex_cache = {}
def compile_keyword(keyword, regex):
if reg := regex_cache.get((keyword, regex), None):
return reg
if not regex:
keyword = re.escape(keyword)
reg = re.compile(rf"(?i)\b{keyword}\b")
regex_cache[(keyword, regex)] = reg
return reg
def contains(string, keyword, regex):
return compile_keyword(keyword, regex).search(string) is not None
def test_keyword(keyword, regex):
reg = compile_keyword(keyword, regex)
return any(map(lambda x: reg.search(x) is not None, TESTS))
def first(tpl):
return tpl[0]
def unpack(lst_of_tpl):
"""Takes a list of tuples and maps to a list of their first elements"""
return list(map(first, lst_of_tpl))
async def fetch_unpacked(db, sql, params=None):
cur = await db.cursor()
cur.row_factory = lambda cursor, row: first(row)
cur = await cur.execute(sql, params)
return await cur.fetchall()
async def fetch_exists(db, sql, params=None):
result = await db.execute_fetchall(f"SELECT EXISTS({sql})", params)
return result[0][0]
async def in_thread(member, thread):
# FIXME: Currently overlooks the situation where a moderator isn't in a thread but has manage threads
return any(member.id == thread_member.id for thread_member in await thread.fetch_members())
async def can_view(ctx, member) -> bool:
if ctx.channel.type == ChannelType.private_thread and not in_thread(member, ctx.channel):
return False
return ctx.channel.permissions_for(member).view_channel
# ===== Borrowed from https://github.com/avrae/avrae, GPLv3 licensed =====
def list_get(index, default, l):
try:
return l[index]
except IndexError:
return default
def get_positivity(string):
if isinstance(string, bool): # oi!
return string
lowered = string.lower()
if lowered in ("yes", "y", "true", "t", "1", "enable", "on"):
return True
elif lowered in ("no", "n", "false", "f", "0", "disable", "off"):
return False
else:
return None
def auth_and_chan(ctx):
def check(message):
return message.author == ctx.author and message.channel == ctx.channel
return check
async def confirm(ctx, message, delete_msgs=False, response_check=get_positivity):
"""
Confirms whether a user wants to take an action.
:rtype: bool|None
:param ctx: The current Context.
:param message: The message for the user to confirm.
:param delete_msgs: Whether to delete the messages.
:param response_check: A function (str) -> bool that returns whether a given reply is a valid response.
:type response_check: (str) -> bool
:return: Whether the user confirmed or not. None if no reply was recieved
"""
msg = await ctx.channel.send(message)
try:
reply = await ctx.bot.wait_for("message", timeout=30, check=auth_and_chan(ctx))
except TimeoutError:
return None
reply_bool = response_check(reply.content) if reply is not None else None
if delete_msgs:
try:
await msg.delete()
await reply.delete()
except:
pass
return reply_bool
def cleanup_code(content):
"""Automatically removes code blocks from the code."""
# remove ```py\n```
if content.startswith("```") and content.endswith("```"):
return "\n".join(content.split("\n")[1:-1])
# remove `foo`
return content.strip("` \n")
# ===== End code borrowed from Avrae =====