220 lines
6.8 KiB
Python
220 lines
6.8 KiB
Python
import json
|
||
import logging
|
||
from typing import Literal
|
||
|
||
import disnake
|
||
import re2 as re
|
||
from disnake.ext.commands import Cog, group, guild_only
|
||
|
||
from .notifications import KeywordError
|
||
from .utils import confirm
|
||
|
||
log = logging.getLogger("nomen.settings")
|
||
log.setLevel(logging.INFO)
|
||
|
||
DUSTY_DATA_LINE = re.compile(r"^(.+) – notified (\d+) times$")
|
||
|
||
|
||
class InvalidImportFormat(Exception):
|
||
pass
|
||
|
||
|
||
class Settings(Cog):
|
||
def __init__(self, bot):
|
||
self.bot = bot
|
||
|
||
@group(invoke_without_command=True)
|
||
async def nomen(self, ctx):
|
||
"""
|
||
Managing your data in Nomen
|
||
"""
|
||
|
||
await ctx.send_help(self.nomen)
|
||
|
||
@nomen.command()
|
||
async def purge(self, ctx):
|
||
"""
|
||
Deletes all user data stored in Nomen
|
||
"""
|
||
|
||
msg = f"""\
|
||
Are you sure you wish to delete **ALL** data across **ALL** servers? (y/N)
|
||
NOTE: If you have currently opted-out, this will opt you back in.
|
||
|
||
You may want to `{ctx.clean_prefix}nomen export` first"""
|
||
|
||
to_purge = await confirm(ctx, msg)
|
||
|
||
if to_purge:
|
||
# Foreign key constraints will automatically cascade to delete dependent data
|
||
await self.bot.db.execute("DELETE FROM users WHERE user_id=?", (ctx.author.id,))
|
||
await self.bot.db.commit()
|
||
await ctx.send("Deleted all user data.")
|
||
else:
|
||
await ctx.send("Cancelled.")
|
||
|
||
async def import_json(self, ctx, data):
|
||
try:
|
||
data = json.loads(data)
|
||
except json.JSONDecodeError as e:
|
||
raise InvalidImportFormat from e
|
||
await ctx.send("Importing JSON")
|
||
|
||
async def import_dusty(self, ctx, data):
|
||
lines = map(str.strip, data.split("\n"))
|
||
to_add = []
|
||
|
||
for line in lines:
|
||
if match := DUSTY_DATA_LINE.fullmatch(line):
|
||
to_add.append((match.group(1), int(match.group(2))))
|
||
else:
|
||
raise InvalidImportFormat
|
||
|
||
await ctx.send(f"Importing from Dusty notification list to {ctx.guild}...")
|
||
|
||
added = []
|
||
errors = []
|
||
|
||
async with ctx.typing():
|
||
notifications = self.bot.get_cog("Notifications")
|
||
for keyword, count in to_add:
|
||
try:
|
||
await notifications.add_keyword(ctx.guild.id, ctx.author.id, keyword, False, count)
|
||
added.append(keyword)
|
||
except KeywordError as e:
|
||
errors.append(e)
|
||
|
||
if not errors:
|
||
await ctx.send("Successfully imported all keywords!")
|
||
else:
|
||
await ctx.send(
|
||
f"Import successful, with some errors\n\n"
|
||
f"Sucessfully imported:\n{"\n".join(f"- {keyword}" for keyword in added)}\n"
|
||
f"Errors (check PMs):\n{"\n".join(f"- {e.msg}" for e in errors)}\n\n"
|
||
f"If you want to import erroring keywords, delete conflicting keywords and modify the list to only include the ones that failed to import, and run the command again."
|
||
)
|
||
for e in errors:
|
||
await ctx.author.send(e.dm_msg)
|
||
|
||
async def import_raw(self, data):
|
||
pass
|
||
|
||
@nomen.command(name="import")
|
||
@guild_only()
|
||
async def _import(self, ctx, *, data):
|
||
"""
|
||
Imports user data, either from JSON or from a simple list
|
||
"""
|
||
schema = {
|
||
"disabled": bool,
|
||
"keywords": {
|
||
"<guild_id>": [
|
||
{"keyword": str, "regex": bool, "count": int},
|
||
],
|
||
},
|
||
"ignores": {
|
||
"<guild_id>": [int],
|
||
},
|
||
"blocks": [
|
||
int # User ID
|
||
],
|
||
}
|
||
|
||
notifications = self.bot.get_cog("Notifications")
|
||
|
||
try:
|
||
await self.import_json(ctx, data)
|
||
await ctx.send("Sorry, JSON imports and exports haven't been implemented yet.")
|
||
return
|
||
except InvalidImportFormat:
|
||
try:
|
||
await self.import_dusty(ctx, data)
|
||
except InvalidImportFormat:
|
||
await ctx.send("Invalid import format")
|
||
|
||
# @nomen.command()
|
||
# @guild_only()
|
||
# async def export(self, ctx):
|
||
# """
|
||
# Exports a JSON of all of your user data
|
||
# """
|
||
# pass
|
||
|
||
@nomen.command(name="opt-out")
|
||
async def opt_out(self, ctx):
|
||
"""
|
||
Opt-out of Nomen processing your messages entirely
|
||
|
||
You will not trigger anyone else's notifications, and you will not receive any notifications
|
||
"""
|
||
|
||
if await ctx.bot.get_setting(ctx.author.id, "disabled"):
|
||
await ctx.send("You are already opted-out")
|
||
return
|
||
|
||
log.debug(f"Opting-out: {ctx.author} ({ctx.author.id})")
|
||
|
||
await ctx.send(
|
||
f"You have now opted-out and will no longer trigger or receive notifications. To opt back in, run `{ctx.clean_prefix}nomen opt-in`"
|
||
)
|
||
|
||
await self.bot.db.execute("REPLACE INTO users (user_id, disabled) VALUES(?, 1)", (ctx.author.id,))
|
||
await self.bot.db.commit()
|
||
|
||
@nomen.command(name="opt-in")
|
||
async def opt_in(self, ctx):
|
||
"""
|
||
Opt-in to Nomen processing your messages
|
||
"""
|
||
|
||
if not await ctx.bot.get_setting(ctx.author.id, "disabled"):
|
||
await ctx.send("You are already opted-in")
|
||
return
|
||
|
||
log.debug(f"Opting-in: {ctx.author} ({ctx.author.id})")
|
||
|
||
await ctx.send(
|
||
f"You have opted back in and will now trigger and receive notifications. To opt out, run `{ctx.clean_prefix}nomen opt-out`"
|
||
)
|
||
|
||
await self.bot.db.execute("REPLACE INTO users (user_id, disabled) VALUES(?, 0)", (ctx.author.id,))
|
||
await self.bot.db.commit()
|
||
|
||
@group(invoke_without_command=True)
|
||
async def settings(
|
||
self,
|
||
ctx,
|
||
setting: Literal["use-embed", "notify-self", "bots-notify"],
|
||
value: bool,
|
||
):
|
||
"""
|
||
Change settings for Nomen
|
||
|
||
Valid settings are:
|
||
- `use-embed`
|
||
- `notify-self`
|
||
- `bots-notify`
|
||
"""
|
||
setting_db = setting.replace("-", "_")
|
||
|
||
await ctx.bot.set_setting(ctx.author.id, setting_db, value)
|
||
await self.settings_list(ctx)
|
||
|
||
@settings.command(name="list")
|
||
async def settings_list(self, ctx):
|
||
"""
|
||
Print all of your current settings
|
||
"""
|
||
|
||
settings = await ctx.bot.db.execute_fetchall(
|
||
"SELECT use_embed, notify_self, bots_notify FROM users WHERE user_id=?", (ctx.author.id,)
|
||
)
|
||
|
||
settings = ["Yes" if v else "No" for v in settings[0]]
|
||
|
||
embed = disnake.Embed(title="Current Settings")
|
||
embed.add_field("Use Embeds in Notifications (`use-embed`)", settings[0], inline=False)
|
||
embed.add_field("Notify Self (`notify-self`)", settings[1], inline=False)
|
||
embed.add_field("Notifications from Bots (`bots-notify`)", settings[2], inline=False)
|
||
|
||
await ctx.send(embed=embed)
|