import logging
from asyncio import TaskGroup
from textwrap import indent
from typing import Union
from disnake import Embed, Member, Thread, User
from disnake.abc import GuildChannel
from disnake.errors import Forbidden
from disnake.ext.commands import Cog, group, guild_only
from .utils import can_view, confirm, test_keyword
log = logging.getLogger("nomen.notifications")
log_ = log
class NotifierLogAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
return (
f"[guild: {self.extra["guild"]}, user: {self.extra["user"]}, keyword: {self.extra["keyword"]}] {msg}",
class KeywordError(Exception):
def __init__(self, msg=None, dm_msg=None, *args):
self.msg = msg
self.dm_msg = dm_msg
super().__init__(msg, dm_msg, *args)
async def send(self, ctx):
if self.msg:
await ctx.send(self.msg + " (check DMs)")
if self.dm_msg:
await ctx.author.send(self.dm_msg)
async def handle_notification(db_updates, ctx, message, keyword, user_id, use_embed):
Async task to dispatch a notification
member = await ctx.guild.getch_member(user_id)
log = NotifierLogAdapter(log_.getChild("notifier"), {"guild": ctx.guild, "user": member, "keyword": keyword})
log.debug("Handling notification")
if not await can_view(ctx, member):
log.debug("Missing permission")
db_updates.append((ctx.guild.id, keyword, user_id))
header = f"🔔 `{message.author.display_name}` mentioned `{keyword}` on `{ctx.guild}`:"
footer = f"\n<t:{int(message.created_at.timestamp())}:R> | [Show]({message.jump_url}) | <#{ctx.channel.id}>"
if use_embed:
log.debug("Sending embed")
embed = Embed(description=message.content + "\n" + footer)
name=f"{message.author.display_name} ({message.author})",
await member.send(header, embed=embed)
except Forbidden:
log.warning("Cannot send messages to this user")
log.debug("Sending plain message")
await member.send("\n".join((header, indent(message.content, "> ", lambda line: True).strip(), footer)))
except Forbidden:
log.warning("Cannot send messages to this user")
async def handle_triggers(ctx, message):
Main function that handles message triggers
log.debug(f"Handling triggers for '{message.content}' from {ctx.author}")
params = {
"author": ctx.author.id,
"channel": ctx.channel.id,
"category": ctx.channel.category_id,
"guild": ctx.guild.id,
"content": message.content,
"is_bot": ctx.author.bot,
"parent": 0,
if isinstance(ctx.channel, Thread) and ctx.channel.parent:
params["parent"] = ctx.channel.parent.id
disabled = await ctx.bot.db.fetch_exists(
"SELECT * FROM users WHERE user_id=:author AND unlikely(disabled IS 1)", params
if disabled:
log.debug(f"User {ctx.author} ({ctx.author.id}) opted out")
db_updates = []
search_query = """
SELECT keyword, user_id, use_embed
FROM ( -- Deduplicate notification recipients by user_id
SELECT keyword, user_id, use_embed, row_number() over (partition by user_id) n
FROM keywords
LEFT JOIN users USING (user_id)
WHERE likely(disabled IS NOT 1) -- Don't notify users who opted out
AND (unlikely(notify_self IS 1) OR user_id IS NOT :author) -- Don't notify author unless wanted
AND (unlikely(bots_notify IS 1) OR :is_bot IS NOT 1) -- Don't notify from bots unless wanted
AND likely(NOT EXISTS(SELECT * FROM ( -- Don't notify if...
SELECT target FROM user_ignores -- Author or channel is ignored in this guild
WHERE user_id=user_id AND guild_id=:guild
SELECT target FROM user_blocks -- Author is blocked
WHERE user_id=user_id
) WHERE target IN (:author, :channel, :category, :parent)))
AND likely(NOT EXISTS(SELECT * FROM user_pauses WHERE user_id=user_id AND guild_id=:guild))
AND guild_id=:guild AND unlikely(contains(:content, keyword, regex))
async with TaskGroup() as tg:
async with ctx.bot.db.execute(search_query, params) as cur:
async for keyword, user_id, use_embed in cur:
log.debug(f"- Creating notification task: '{keyword}' for {user_id}")
tg.create_task(handle_notification(db_updates, ctx, message, keyword, user_id, use_embed))
log.debug("Incrementing notification counts")
await ctx.bot.db.executemany(
"UPDATE keywords SET count = count + 1 WHERE guild_id=? AND keyword=? AND user_id=?",
await ctx.bot.db.commit()
class Notifications(Cog):
def __init__(self, bot):
self.bot = bot
async def on_message(self, message):
if message.author == self.bot.user:
ctx = await self.bot.get_context(message)
if ctx.valid or ctx.guild is None:
await handle_triggers(ctx, message)
async def cog_before_invoke(self, ctx):
await ctx.bot.db.execute("INSERT OR IGNORE INTO users (user_id) VALUES(?)", (ctx.author.id,))
aliases=["kw", "notification", "notifications", "notif", "noti"],
async def keyword(self, ctx):
Notification keywords
await ctx.send_help(self.keyword)
async def _check_conflicts(self, guild_id, user_id, keyword):
return await self.bot.db.fetch_unpacked(
"SELECT keyword FROM keywords WHERE guild_id=? AND user_id=? AND contains(?, keyword, regex)",
(guild_id, user_id, keyword),
async def _check_redundants(self, guild_id, user_id, keyword, regex):
return await self.bot.db.fetch_unpacked(
"SELECT keyword FROM keywords WHERE guild_id=? AND user_id=? AND contains(keyword, ?, ?)",
(guild_id, user_id, keyword, regex),
async def add_keyword(self, guild_id: int, user_id: int, keyword: str, regex: bool, initial_count: int = 0):
f"Adding {'regex' if regex else 'keyword'}: {keyword} of {user_id} on {guild_id} with count {initial_count}"
if test_keyword(keyword, regex):
log.debug("Keyword too common")
raise KeywordError(f"{'Regex' if regex else 'Keyword'} matches a word that is too common")
if conflicts := await self._check_conflicts(guild_id, user_id, keyword):
log.debug("Keyword conflicts with existing keyword")
raise KeywordError(
f"Any instance of `{keyword}` would be matched by existing keywords",
f"Conflicts with keyword `{keyword}`:\n" + "\n".join(f"- `{conflict}`" for conflict in conflicts),
if redundants := await self._check_redundants(guild_id, user_id, keyword, regex):
log.debug("Keyword renders existing redundant")
raise KeywordError(
f"Adding `{keyword}` will cause existing keywords to never match",
f"Keywords redundant from `{keyword}`:\n" + "\n".join(f" - `{conflict}`" for conflict in redundants),
log.debug("Keyword valid, adding")
await self.bot.db.execute(
"INSERT INTO keywords VALUES (?, ?, ?, ?, ?)", (guild_id, keyword, user_id, regex, initial_count)
await self.bot.db.commit()
async def add(self, ctx, keyword):
Adds a notification keyword
Use quotes for a keyword with spaces!
await self.add_keyword(ctx.guild.id, ctx.author.id, keyword, False)
except KeywordError as e:
await e.send(ctx)
await ctx.send(f"Added `{keyword}` to your list of keywords")
async def regex(self, ctx, keyword):
Adds a notification regex
Use quotes for a regex with spaces!
# TODO: Add regex names to make notifications cleaner
await self.add_keyword(ctx.guild.id, ctx.author.id, keyword, True)
except KeywordError as e:
await e.send(ctx)
await ctx.send(f"Added regex `{keyword}` to your list of keywords")
@keyword.command(aliases=["delete", "del"])
async def remove(self, ctx, keyword):
Removes a keyword or regex
Must be identical to the keyword or regex you intend on removing
log.debug(f"Removing keyword: {keyword} of {ctx.author} (ctx.author.id) on {ctx.guild} ({ctx.guild.id})")
await ctx.bot.db.execute(
"DELETE FROM keywords WHERE guild_id=? AND keyword=? AND user_id=?",
(ctx.guild.id, keyword, ctx.author.id),
await ctx.bot.db.commit()
await ctx.send(f"Removed `{keyword}` from your list of keywords")
async def list(self, ctx):
Lists keywords and regexes, with trigger count
log.debug(f"Listing keywords: {ctx.author} ({ctx.author.id}) on {ctx.guild} ({ctx.guild.id})")
keywords = await ctx.bot.db.execute_fetchall(
"SELECT keyword, count, regex FROM keywords WHERE guild_id=? AND user_id=?", (ctx.guild.id, ctx.author.id)
embed = Embed(
f"{'regex '*regex}`{keyword}`: notified `{count}` times" for keyword, count, regex in keywords
embed.set_author(name=f"Your keywords on {ctx.guild}:", icon_url=ctx.guild.icon)
await ctx.author.send(embed=embed)
await ctx.send("Please check your direct messages")
async def clear(self, ctx):
Clears keywords and regexes for this server
log.debug(f"Clearing keywords: {ctx.author} ({ctx.author.id}) on {ctx.guild} ({ctx.guild.id})")
to_clear = await confirm(
ctx, "Are you sure you want to clear your keywords on {ctx.guild}? (Reply with yes/no)", True
if to_clear is None:
return await ctx.send("Timed out or received an invalid response", delete_after=10)
elif not to_clear:
return await ctx.send("Canceled", delete_after=10)
await ctx.send("Clearing keywords")
await ctx.bot.db.execute(
"DELETE FROM KEYWORDS WHERE guild_id=? AND user_id=?",
(ctx.guild.id, ctx.author.id),
await ctx.bot.db.commit()
async def pause(self, ctx):
Pause notifications in this guild
params = (ctx.author.id, ctx.guild.id)
if await ctx.bot.db.fetch_exists("SELECT * FROM user_pauses WHERE user_id=? AND guild_id=?", params):
await ctx.send(f"Notifications in {ctx.guild} are already paused")
await ctx.bot.db.execute("INSERT INTO user_pauses VALUES(?, ?)", params)
await ctx.bot.db.commit()
await ctx.send(f"Paused notifications in {ctx.guild}")
async def resume(self, ctx):
Resume notifications in this guild
params = (ctx.author.id, ctx.guild_id)
if await ctx.bot.db.fetch_exists("SELECT * FROM user_pauses WHERE user_id=? AND guild_id=?", params):
await ctx.bot.db.execute("DELETE FROM user_pauses WHERE user_id=? AND guild_id=?", params)
await ctx.bot.db.commit()
await ctx.send(f"Resumed notifications in {ctx.guild}")
await ctx.send(f"Notifications in {ctx.guild} aren't paused")
async def ignore(self, ctx, target: Union[GuildChannel, Member]):
Ignore a channel or member in this guild
This prevents you from receiving any notifications from that channel or user in this guild
params = (ctx.author.id, ctx.guild.id, target.id)
if await ctx.bot.db.fetch_exists(
"SELECT * FROM user_ignores WHERE user_id=? AND guild_id=? AND target=?", params
await ctx.send(f"Target `{repr(target)}` already ignored")
await ctx.bot.db.execute("INSERT INTO user_ignores VALUES(?, ?, ?)", params)
await ctx.bot.db.commit()
await ctx.send(f"Now ignoring `{repr(target)}`")
async def unignore(self, ctx, target: Union[GuildChannel, Member]):
Unignore a channel or member in this guild
This allows you to receive notifications from that channel or user again
params = (ctx.author.id, ctx.guild.id, target.id)
if not await ctx.bot.db.fetch_exists(
"SELECT * FROM user_ignores WHERE user_id=? AND guild_id=? AND target=?", params
await ctx.send(f"Target `{repr(target)}` is not currently being ignored")
await ctx.bot.db.execute("DELETE FROM user_ignores WHERE user_id=? AND guild_id=? AND target=?", params)
await ctx.bot.db.commit()
await ctx.send(f"No longer ignoring `{repr(target)}`")
async def block(self, ctx, target: User):
Block a user from triggering your notifications
This prevents you from receiving notifications from that user in any guild
NOTE: They will still receive notifications from you
await ctx.message.delete(delay=0) # Delete invoking message if able to
params = (ctx.author.id, ctx.target.id)
if await ctx.bot.db.fetch_exists("SELECT * FROM user_blocks WHERE user_id=? AND target=?", params):
await ctx.author.send(f"Target {target} already blocked")
await ctx.bot.db.execute(
"INSERT INTO user_blocks VALUES(?, ?)",
(ctx.author.id, target.id),
await ctx.bot.db.commit()
await ctx.author.send(f"Blocked {target}")
async def unblock(self, ctx, target: User):
Unblock a user
This allows you to receive notifications from that user again
await ctx.message.delete(delay=0) # Delete invoking message if able to
params = (ctx.author.id, ctx.target.id)
if await ctx.bot.db.fetch_exists("SELECT * FROM user_blocks WHERE user_id=? AND target=?", params):
await ctx.author.send(f"Target {target} already blocked")
await ctx.bot.db.execute("DELETE FROM user_ignores WHERE user_id=? AND target=?", params)
await ctx.bot.db.commit()
await ctx.author.send(f"Unblocked {target}")