From 214c6dc5981d43e76d12ea1fd358ac0e4da9ec72 Mon Sep 17 00:00:00 2001 From: tookender Date: Sat, 18 May 2024 17:33:04 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=91=20Try=20working=20on=20logging=20s?= =?UTF-8?q?ystem?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot.py | 95 ++- data/schema.sql | 55 +- extensions/config/__init__.py | 9 + extensions/config/_base.py | 8 + extensions/config/levelling.py | 187 ++++++ extensions/config/logging.py | 1024 ++++++++++++++++++++++++++++++ extensions/logging/__init__.py | 13 + extensions/logging/_base.py | 199 ++++++ extensions/logging/join_leave.py | 84 +++ extensions/logging/member.py | 156 +++++ extensions/logging/message.py | 116 ++++ extensions/logging/server.py | 468 ++++++++++++++ extensions/logging/voice.py | 118 ++++ utils/context.py | 153 +++++ utils/logging.py | 159 +++++ utils/utils.py | 23 + 16 files changed, 2861 insertions(+), 6 deletions(-) create mode 100644 extensions/config/__init__.py create mode 100644 extensions/config/_base.py create mode 100644 extensions/config/levelling.py create mode 100644 extensions/config/logging.py create mode 100644 extensions/logging/__init__.py create mode 100644 extensions/logging/_base.py create mode 100644 extensions/logging/join_leave.py create mode 100644 extensions/logging/member.py create mode 100644 extensions/logging/message.py create mode 100644 extensions/logging/server.py create mode 100644 extensions/logging/voice.py create mode 100644 utils/context.py create mode 100644 utils/logging.py diff --git a/bot.py b/bot.py index e55f601..457f031 100644 --- a/bot.py +++ b/bot.py @@ -1,16 +1,36 @@ +import typing import logging import pathlib -from typing import List +from typing import List, Type +from collections import defaultdict import aiohttp import asyncpg import discord import mystbin as mystbin_library from discord.ext import commands +from utils.utils import col +from utils.logging import LoggingEventsFlags import config +class LoggingConfig: + __slots__ = ("default", "message", "member", "join_leave", "voice", "server") + + def __init__(self, default, message, member, join_leave, voice, server): + self.default = default + self.message = message + self.member = member + self.join_leave = join_leave + self.voice = voice + self.server = server + + def _replace(self, **kwargs): + for key, value in kwargs.items(): + setattr(self, key, value) + + class Korii(commands.AutoShardedBot): pool: asyncpg.Pool user: discord.ClientUser @@ -43,6 +63,19 @@ def __init__(self) -> None: self.ping_cooldown: commands.CooldownMapping = commands.CooldownMapping.from_cooldown(1, 5, commands.BucketType.user) self.levelling_cooldown: commands.CooldownMapping = commands.CooldownMapping.from_cooldown(1, 45, commands.BucketType.member) + self.log_webhooks: Type[LoggingConfig] = LoggingConfig + self.log_channels: typing.Dict[int, LoggingConfig] = {} + self.log_cache = defaultdict(lambda: defaultdict(list)) + self.guild_loggings: typing.Dict[int, LoggingEventsFlags] = {} + + def tick(self, boolean: bool | None): + if boolean == True: + return self.E["yes"] + elif boolean == False: + return self.E["no"] + else: + return self.E["hyphen"] + def fill(self, name: str, variable: list): file = open(name, encoding="utf-8") for line in file.readlines(): @@ -71,6 +104,32 @@ def bot_code(self): if "#" in line: self.comments += 1 + def update_log(self, deliver_type: str, webhook_url: str, guild_id: int): + guild_id = getattr(guild_id, "id", guild_id) + if deliver_type == "default": + self.log_channels[guild_id]._replace(default=webhook_url) + elif deliver_type == "message": + self.log_channels[guild_id]._replace(message=webhook_url) + elif deliver_type == "member": + self.log_channels[guild_id]._replace(member=webhook_url) + elif deliver_type == "join_leave": + self.log_channels[guild_id]._replace(join_leave=webhook_url) + elif deliver_type == "voice": + self.log_channels[guild_id]._replace(voice=webhook_url) + elif deliver_type == "server": + self.log_channels[guild_id]._replace(server=webhook_url) + + async def load_emojis(self) -> None: + await self.wait_until_ready() + EMOJI_GUILDS = [1036756543917527161, 1040293187354361857] + + for guild_id in EMOJI_GUILDS: + guild = self.get_guild(guild_id) + assert guild + emojis = guild.emojis + for emoji in emojis: + self.E[f"{emoji.name}"] = f"<:ghost:{emoji.id}>" + async def load_extensions(self) -> None: success = 0 failed = 0 @@ -103,7 +162,7 @@ async def load_extensions(self) -> None: return self.ext_logger.info(f"Loaded {success} out of {success + failed} extensions") async def setup_hook(self) -> None: - self.pool = await asyncpg.create_pool(config.DATABASE) # type: ignore + self.pool = await asyncpg.create_pool(config.DATABASE) # type: ignore if not self.pool: raise RuntimeError("Failed to connect with the database.") @@ -112,6 +171,7 @@ async def setup_hook(self) -> None: await self.pool.execute(file.read()) self.bot_code() + await self.load_emojis() await self.load_extensions() async def start(self) -> None: @@ -134,3 +194,34 @@ async def get_prefix(self, message: discord.Message, /) -> List[str] | str: prefixes.append("") return commands.when_mentioned_or(*prefixes)(self, message) + + async def populate_cache(self): + for entry in await self.pool.fetch("SELECT * FROM log_channels"): + guild_id = entry["guild_id"] + await self.pool.execute( + "INSERT INTO logging_events(guild_id) VALUES ($1) ON CONFLICT (guild_id) DO NOTHING", + entry["guild_id"], + ) + + self.log_channels[guild_id] = LoggingConfig( + default=entry["default_channel"], + message=entry["message_channel"], + join_leave=entry["join_leave_channel"], + member=entry["member_channel"], + voice=entry["voice_channel"], + server=entry["server_channel"], + ) + + flags = dict( + await self.pool.fetchrow( + "SELECT message_delete, message_purge, message_edit, member_join, member_leave, member_update, user_ban, user_unban, " + "user_update, invite_create, invite_delete, voice_join, voice_leave, voice_move, voice_mod, emoji_create, emoji_delete, " + "emoji_update, sticker_create, sticker_delete, sticker_update, server_update, stage_open, stage_close, channel_create, " + "channel_delete, channel_edit, role_create, role_delete, role_edit FROM logging_events WHERE guild_id = $1", + guild_id, + ) + ) + self.guild_loggings[guild_id] = LoggingEventsFlags(**flags) + + logging.info(f"{col(2)}All cache populated successfully") + self.dispatch("cache_ready") diff --git a/data/schema.sql b/data/schema.sql index a1c6f45..3ae36f4 100644 --- a/data/schema.sql +++ b/data/schema.sql @@ -35,7 +35,54 @@ CREATE TABLE IF NOT EXISTS guilds ( levelling_delete_after INT DEFAULT NULL ); -CREATE TABLE IF NOT EXISTS avatars ( - user_id BIGINT PRIMARY KEY, - avatar_url TEXT NOT NULL -) \ No newline at end of file +CREATE TABLE IF NOT EXISTS log_channels ( + guild_id BIGINT PRIMARY KEY, + default_channel TEXT, + default_chid BIGINT NOT NULL, + message_channel TEXT, + message_chid BIGINT, + join_leave_channel TEXT, + join_leave_chid BIGINT, + member_channel TEXT, + member_chid BIGINT, + voice_channel TEXT, + voice_chid BIGINT, + server_channel TEXT, + server_chid BIGINT, + CONSTRAINT fk_log_channels_guild_id FOREIGN KEY (guild_id) REFERENCES guilds(guild_id) ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS logging_events ( + guild_id BIGINT NOT PRIMARY KEY, + message_delete BOOLEAN DEFAULT true NOT NULL, + message_purge BOOLEAN DEFAULT true NOT NULL, + message_edit BOOLEAN DEFAULT true NOT NULL, + member_join BOOLEAN DEFAULT true NOT NULL, + member_leave BOOLEAN DEFAULT true NOT NULL, + member_update BOOLEAN DEFAULT true NOT NULL, + user_ban BOOLEAN DEFAULT true NOT NULL, + user_unban BOOLEAN DEFAULT true NOT NULL, + user_update BOOLEAN DEFAULT true NOT NULL, + invite_create BOOLEAN DEFAULT true NOT NULL, + invite_delete BOOLEAN DEFAULT true NOT NULL, + voice_join BOOLEAN DEFAULT true NOT NULL, + voice_leave BOOLEAN DEFAULT true NOT NULL, + voice_move BOOLEAN DEFAULT true NOT NULL, + voice_mod BOOLEAN DEFAULT true NOT NULL, + emoji_create BOOLEAN DEFAULT true NOT NULL, + emoji_delete BOOLEAN DEFAULT true NOT NULL, + emoji_update BOOLEAN DEFAULT true NOT NULL, + sticker_create BOOLEAN DEFAULT true NOT NULL, + sticker_delete BOOLEAN DEFAULT true NOT NULL, + sticker_update BOOLEAN DEFAULT true NOT NULL, + server_update BOOLEAN DEFAULT true NOT NULL, + stage_open BOOLEAN DEFAULT true NOT NULL, + stage_close BOOLEAN DEFAULT true NOT NULL, + channel_create BOOLEAN DEFAULT true NOT NULL, + channel_delete BOOLEAN DEFAULT true NOT NULL, + channel_edit BOOLEAN DEFAULT true NOT NULL, + role_create BOOLEAN DEFAULT true NOT NULL, + role_delete BOOLEAN DEFAULT true NOT NULL, + role_edit BOOLEAN DEFAULT true NOT NULL, + CONSTRAINT fk_logging_events_guild_id FOREIGN KEY (guild_id) REFERENCES guilds(guild_id) ON DELETE CASCADE +); diff --git a/extensions/config/__init__.py b/extensions/config/__init__.py new file mode 100644 index 0000000..1967429 --- /dev/null +++ b/extensions/config/__init__.py @@ -0,0 +1,9 @@ +from .levelling import LevellingConfig + + +class Config(LevellingConfig): + pass + + +async def setup(bot): + await bot.add_cog(Config(bot)) \ No newline at end of file diff --git a/extensions/config/_base.py b/extensions/config/_base.py new file mode 100644 index 0000000..f6220ee --- /dev/null +++ b/extensions/config/_base.py @@ -0,0 +1,8 @@ +from discord.ext import commands + +from bot import Korii + + +class ConfigBase(commands.Cog): + def __init__(self, bot: Korii): + self.bot = bot \ No newline at end of file diff --git a/extensions/config/levelling.py b/extensions/config/levelling.py new file mode 100644 index 0000000..ee99c53 --- /dev/null +++ b/extensions/config/levelling.py @@ -0,0 +1,187 @@ +import traceback +from typing import Optional + +import discord +from discord import app_commands + +from utils import Embed, Interaction, Invalid + +from ._base import ConfigBase + + +class Modal(discord.ui.Modal): + def __init__(self): + super().__init__() + + async def update_database(self, interaction: Interaction): + return + + async def on_error(self, interaction: Interaction, error: Exception): + if isinstance(error, Invalid): + return await interaction.response.send_message(str(error), ephemeral=True) + + async def on_submit(self, interaction: Interaction): + await self.update_database(interaction) + return await update_message(interaction) + + +class MessageModal(Modal, title="Config: Levelling"): + message = discord.ui.TextInput( + label="Announcement Message", + placeholder="Variables: {user} {user_mention} {level} {next_level} {guild}", + ) + + async def update_database(self, interaction: Interaction): + assert interaction.guild + return await interaction.client.pool.execute( + "UPDATE guilds SET levelling_message = $1 WHERE guild_id = $2", self.message.value, interaction.guild.id + ) + + +class LevellingChannelView(discord.ui.View): + def __init__(self): + super().__init__() + + @discord.ui.select( + cls=discord.ui.ChannelSelect, + placeholder="Select an announcement channel...", + max_values=1, + min_values=1, + channel_types=[ + discord.ChannelType.text, + discord.ChannelType.news, + discord.ChannelType.voice, + discord.ChannelType.private_thread, + discord.ChannelType.public_thread, + ], + ) + async def select_channel(self, interaction: Interaction, select: discord.ui.ChannelSelect): + assert interaction.guild and isinstance(interaction.user, discord.Member) + channel = interaction.guild.get_channel_or_thread(select.values[0].id) + assert isinstance(channel, discord.abc.Messageable) + + if not channel.permissions_for(interaction.user).send_messages: + return await interaction.response.send_message("You can't send messages in that channel.", ephemeral=True) + + await interaction.client.pool.execute("UPDATE guilds SET levelling_channel = $1 WHERE guild_id = $2", channel.id, interaction.guild.id) + return await update_message(interaction) + + +class XPMultiplierModal(Modal, title="Config: Levelling"): + multiplier = discord.ui.TextInput( + label="XP Multiplier", + placeholder="Multiplier of XP. Default is 1.0", + ) + + async def update_database(self, interaction: Interaction): + assert interaction.guild + multiplier = float(self.multiplier.value) + + if not multiplier or not isinstance(multiplier, float): + raise Invalid("Invalid multiplier. Must be a float (e.g. 2.0, 1.5, etc)") + + await interaction.client.pool.execute("UPDATE guilds SET levelling_multiplier = $1 WHERE guild_id = $2", multiplier, interaction.guild.id) + return await update_message(interaction) + + +class ConfigLevellingDropdown(discord.ui.Select): + def __init__(self): + options = [ + discord.SelectOption(label="Enable/Disable Levelling", description="Enable or disable levelling.", emoji="⚑"), + discord.SelectOption(label="Enable/Disable Announcements", description="Change if we should send a level-up message.", emoji="πŸ“’"), + discord.SelectOption( + label="Change Announcement Channel", description="Change where the message that is sent once you level-up.", emoji="#️⃣" + ), + discord.SelectOption(label="Change Announcement Message", description="Change the message that is sent once you level-up.", emoji="πŸ’¬"), + discord.SelectOption(label="Change XP Multiplier", description="Change the XP multiplier. Default is 1.0.", emoji="πŸ†"), + ] + + super().__init__(placeholder="Change an option...", min_values=1, max_values=1, options=options) + + async def callback(self, interaction: Interaction): + if interaction.guild: + data = await interaction.client.pool.fetchrow( + "SELECT levelling_enabled, levelling_announce, levelling_multiplier FROM guilds WHERE guild_id = $1", interaction.guild.id + ) + + match self.values[0]: + case "Enable/Disable Levelling": + await interaction.client.pool.execute( + "UPDATE guilds SET levelling_enabled = $1 WHERE guild_id = $2", False if data[0] else True, interaction.guild.id + ) + case "Enable/Disable Announcements": + await interaction.client.pool.execute( + "UPDATE guilds SET levelling_announce = $1 WHERE guild_id = $2", False if data[1] else True, interaction.guild.id + ) + case "Change Announcement Channel": + return await interaction.response.send_message(view=LevellingChannelView(), ephemeral=True) + case "Change Announcement Message": + return await interaction.response.send_modal(MessageModal()) + case "Change XP Multiplier": + return await interaction.response.send_modal(XPMultiplierModal()) + + return await update_message(interaction) + + +class ConfigLevelling(discord.ui.View): + def __init__(self): + super().__init__() + self.add_item(ConfigLevellingDropdown()) + + @discord.ui.button(label="View Announcement Message", emoji="πŸ’¬", style=discord.ButtonStyle.blurple) + async def view_message(self, interaction: Interaction, button: discord.ui.Button): + if interaction.guild: + message = await interaction.client.pool.fetchval("SELECT levelling_message FROM guilds WHERE guild_id = $1", interaction.guild.id) + + if not message: + return await interaction.response.send_message( + "No announcement message has been found. Use the dropdown to change it.", ephemeral=True + ) + + return await interaction.response.send_message(f"**Announcement Message:** `{message}`", ephemeral=True) + + +async def update_message(interaction: Interaction, edit: Optional[bool] = True): + try: + if interaction.guild: + data = await interaction.client.pool.fetchrow( + "SELECT levelling_enabled, levelling_announce, levelling_channel, levelling_message, levelling_multiplier FROM guilds WHERE guild_id = $1", + interaction.guild.id, + ) + + bool_emojis = { + True: "🟩", + False: "πŸŸ₯", + None: "⬜", + } + + embed = Embed( + title="Config: Levelling", + description="`🟩 Enabled`\n" + "`πŸŸ₯ Disabled`\n" + "`⬜ Not set`\n\n" + f"```\n" + f"Enabled - {bool_emojis[data[0]]}\n" + f"Announce - {bool_emojis[data[1]]}\n" + f"Announcement Channel - {interaction.guild.get_channel(data[2]) if data[2] else bool_emojis[data[2]] + ' direct'}\n" + f"Announcement Message - {bool_emojis[True] + ' view below' if data[3] else bool_emojis[None]}\n" + f"Banned Roles - soonβ„’\n" + f"XP Multiplier - {data[4]}\n" + f"```", + ) + + if not edit: + return await interaction.response.send_message(embed=embed, view=ConfigLevelling()) + + return await interaction.response.edit_message(embed=embed, view=ConfigLevelling()) + + except Exception as e: + return await interaction.response.send_message(f"An error occurred: {traceback.format_exception(e)}", ephemeral=True) + + +class LevellingConfig(ConfigBase): + group = app_commands.Group(name="config", description="Configure your guild's bot configuration.") + + @group.command(description="Configure your guild's levelling system.") + async def levelling(self, interaction: Interaction): + return await update_message(interaction, edit=False) diff --git a/extensions/config/logging.py b/extensions/config/logging.py new file mode 100644 index 0000000..94e1b30 --- /dev/null +++ b/extensions/config/logging.py @@ -0,0 +1,1024 @@ +import asyncio +import difflib +import typing + +import discord +from discord.ext import commands + +from bot import Korii +from utils.context import CustomContext +from utils.logging import LoggingEventsFlags +from ._base import ConfigBase + + +async def get_wh(channel: discord.TextChannel): + if channel.permissions_for(channel.guild.me).manage_webhooks: + webhooks = await channel.webhooks() + for w in webhooks: + if w.user == channel.guild.me: + return w.url + else: + return (await channel.create_webhook(name="Korii Logging", avatar=await channel.guild.me.avatar.read())).url + else: + raise commands.BadArgument("Cannot create webhook!") + + +class ChannelsView(discord.ui.View): + def __init__(self, ctx): + super().__init__() + self.message: discord.Message + self.ctx = ctx + self.bot: Korii = ctx.bot + self.lock = asyncio.Lock() + self.valid_channels = [ + "default", + "message", + "member", + "join_leave", + "voice", + "server", + ] + + @discord.ui.button(style=discord.ButtonStyle.gray, emoji="β™Ύ", row=0) + async def default(self, interaction: discord.Interaction, button: discord.ui.Button): + if self.lock.locked(): + return await interaction.response.defer() + + async with self.lock: + button.style = discord.ButtonStyle.green + for child in self.children: + child.disabled = True + await interaction.response.edit_message(view=self) + to_delete = [] + m = await self.ctx.send("Please send a channel to change the **Message Events Channel**") + to_delete.append(m) + + def check(msg: discord.Message): + if msg.channel == self.ctx.channel and msg.author == self.ctx.author: + to_delete.append(msg) + return True + return False + + while True: + message: discord.Message = await self.bot.wait_for("message", check=check) + if message.content == "cancel": + break + else: + try: + channel = await commands.TextChannelConverter().convert(self.ctx, message.content) + break + except commands.ChannelNotFound: + pass + + await message.add_reaction("βœ…") + channel_string = message.content + if channel_string.lower() == "cancel": + pass + else: + try: + webhook_url = await get_wh(channel) + await self.bot.pool.execute( + "UPDATE log_channels SET default_channel = $2, default_chid = $3 WHERE guild_id = $1", + self.ctx.guild.id, + webhook_url, + channel.id, + ) + self.bot.update_log("default", webhook_url, message.guild.id) + except commands.ChannelNotFound: + pass + except (commands.BadArgument, discord.Forbidden): + await self.ctx.send( + "Could not create a webhook in that channel!\n" "Do i have **Manage Webhooks** permissions there?" + ) + except discord.HTTPException: + await self.ctx.send("Something went wrong while creating a webhook...") + await self.update_message() + try: + await self.ctx.channel.delete_messages(to_delete) + except: + pass + + @discord.ui.button(style=discord.ButtonStyle.gray, emoji="πŸ“¨", row=0) + async def message(self, interaction: discord.Interaction, button: discord.ui.Button): + if self.lock.locked(): + return await interaction.response.defer() + + async with self.lock: + button.style = discord.ButtonStyle.green + for child in self.children: + child.disabled = True # type: ignore + await interaction.response.edit_message(view=self) + to_delete = [] + m = await self.ctx.send("Please send a channel to change the **Message Events Channel**") + to_delete.append(m) + + def check(message: discord.Message): + if message.channel == self.ctx.channel and message.author == self.ctx.author: + to_delete.append(message) + return True + return False + + while True: + message: discord.Message = await self.bot.wait_for("message", check=check) + if message.content == "cancel": + break + else: + try: + channel = await commands.TextChannelConverter().convert(self.ctx, message.content) + break + except commands.ChannelNotFound: + pass + + await message.add_reaction("βœ…") + channel_string = message.content + if channel_string.lower() == "cancel": + pass + else: + try: + webhook_url = await get_wh(channel) + await self.bot.pool.execute( + "UPDATE log_channels SET message_channel = $2, message_chid = $3 WHERE guild_id = $1", + self.ctx.guild.id, + webhook_url, + channel.id, + ) + self.bot.update_log("message", webhook_url, message.guild.id) + except commands.ChannelNotFound: + pass + except (commands.BadArgument, discord.Forbidden): + await self.ctx.send( + "Could not create a webhook in that channel!\n" "Do i have **Manage Webhooks** permissions there?" + ) + except discord.HTTPException: + await self.ctx.send("Something went wrong while creating a webhook...") + await self.update_message() + try: + await self.ctx.channel.delete_messages(to_delete) + except: + pass + + @discord.ui.button(style=discord.ButtonStyle.gray, emoji="πŸ‘‹", row=1) + async def join_leave(self, interaction: discord.Interaction, button: discord.ui.Button): + if self.lock.locked(): + return await interaction.response.defer() + + async with self.lock: + button.style = discord.ButtonStyle.green + for child in self.children: + child.disabled = True + await interaction.response.edit_message(view=self) + to_delete = [] + m = await self.ctx.send("Please send a channel to change the **Join and Leave Events Channel**") + to_delete.append(m) + + def check(message: discord.Message): + if message.channel == self.ctx.channel and message.author == self.ctx.author: + to_delete.append(message) + return True + return False + + while True: + message: discord.Message = await self.bot.wait_for("message", check=check) + if message.content == "cancel": + break + else: + try: + channel = await commands.TextChannelConverter().convert(self.ctx, message.content) + break + except commands.ChannelNotFound: + pass + + await message.add_reaction("βœ…") + channel_string = message.content + if channel_string.lower() == "cancel": + pass + else: + try: + webhook_url = await get_wh(channel) + await self.bot.pool.execute( + "UPDATE log_channels SET join_leave_channel = $2, join_leave_chid = $3 WHERE guild_id = $1", + self.ctx.guild.id, + webhook_url, + channel.id, + ) + self.bot.update_log("join_leave", webhook_url, message.guild.id) + except commands.ChannelNotFound: + pass + except (commands.BadArgument, discord.Forbidden): + await self.ctx.send( + "Could not create a webhook in that channel!\n" "Do i have **Manage Webhooks** permissions there?" + ) + except discord.HTTPException: + await self.ctx.send("Something went wrong while creating a webhook...") + await self.update_message() + try: + await self.ctx.channel.delete_messages(to_delete) + except: + pass + + @discord.ui.button(style=discord.ButtonStyle.gray, emoji="πŸ‘€", row=0) + async def member(self, interaction: discord.Interaction, button: discord.ui.Button): + if self.lock.locked(): + return await interaction.response.defer() + + async with self.lock: + button.style = discord.ButtonStyle.green + for child in self.children: + child.disabled = True + await interaction.response.edit_message(view=self) + to_delete = [] + m = await self.ctx.send('Please send a channel to change the **Member Events Channel**\nSend "cancel" to cancel') + to_delete.append(m) + + def check(message: discord.Message): + if message.channel == self.ctx.channel and message.author == self.ctx.author: + to_delete.append(message) + return True + return False + + while True: + message: discord.Message = await self.bot.wait_for("message", check=check) + if message.content == "cancel": + break + else: + try: + channel = await commands.TextChannelConverter().convert(self.ctx, message.content) + break + except commands.ChannelNotFound: + pass + + await message.add_reaction("βœ…") + channel_string = message.content + if channel_string.lower() == "cancel": + pass + else: + try: + webhook_url = await get_wh(channel) + await self.bot.pool.execute( + "UPDATE log_channels SET member_channel = $2, member_chid = $3 WHERE guild_id = $1", + self.ctx.guild.id, + webhook_url, + channel.id, + ) + self.bot.update_log("member", webhook_url, message.guild.id) + except commands.ChannelNotFound: + pass + except (commands.BadArgument, discord.Forbidden): + await self.ctx.send( + "Could not create a webhook in that channel!\n" "Do i have **Manage Webhooks** permissions there?" + ) + except discord.HTTPException: + await self.ctx.send("Something went wrong while creating a webhook...") + await self.update_message() + try: + await self.ctx.channel.delete_messages(to_delete) + except: + pass + + @discord.ui.button(style=discord.ButtonStyle.gray, emoji="βš™", row=1) + async def server(self, interaction: discord.Interaction, button: discord.ui.Button): + if self.lock.locked(): + return await interaction.response.defer() + + async with self.lock: + button.style = discord.ButtonStyle.green + for child in self.children: + child.disabled = True + await interaction.response.edit_message(view=self) + to_delete = [] + m = await self.ctx.send("Please send a channel to change the **Server Events Channel**") + to_delete.append(m) + + def check(message: discord.Message): + if message.channel == self.ctx.channel and message.author == self.ctx.author: + to_delete.append(message) + return True + return False + + while True: + message: discord.Message = await self.bot.wait_for("message", check=check) + if message.content == "cancel": + break + else: + try: + channel = await commands.TextChannelConverter().convert(self.ctx, message.content) + break + except commands.ChannelNotFound: + pass + + await message.add_reaction("βœ…") + channel_string = message.content + if channel_string.lower() == "cancel": + pass + else: + try: + webhook_url = await get_wh(channel) + await self.bot.pool.execute( + "UPDATE log_channels SET server_channel = $2, server_chid = $3 WHERE guild_id = $1", + self.ctx.guild.id, + webhook_url, + channel.id, + ) + self.bot.update_log("server", webhook_url, message.guild.id) + except commands.ChannelNotFound: + pass + except (commands.BadArgument, discord.Forbidden): + await self.ctx.send( + "Could not create a webhook in that channel!\n" "Do i have **Manage Webhooks** permissions there?" + ) + except discord.HTTPException: + await self.ctx.send("Something went wrong while creating a webhook...") + await self.update_message() + try: + await self.ctx.channel.delete_messages(to_delete) + except: + pass + + @discord.ui.button(style=discord.ButtonStyle.gray, emoji="πŸŽ™", row=1) + async def voice(self, interaction: discord.Interaction, button: discord.ui.Button): + if self.lock.locked(): + return await interaction.response.defer() + + async with self.lock: + button.style = discord.ButtonStyle.green + for child in self.children: + child.disabled = True + await interaction.response.edit_message(view=self) + to_delete = [] + m = await self.ctx.send( + "Please send a channel to change the **Voice Events Channel**" '\n_Send "cancel" to cancel_' + ) + to_delete.append(m) + + def check(message: discord.Message): + if message.channel == self.ctx.channel and message.author == self.ctx.author: + to_delete.append(message) + return True + return False + + while True: + message: discord.Message = await self.bot.wait_for("message", check=check) + if message.content == "cancel": + break + else: + try: + channel = await commands.TextChannelConverter().convert(self.ctx, message.content) + break + except commands.ChannelNotFound: + pass + + await message.add_reaction("βœ…") + channel_string = message.content + if channel_string.lower() == "cancel": + pass + else: + try: + webhook_url = await get_wh(channel) + await self.bot.pool.execute( + "UPDATE log_channels SET voice_channel = $2, voice_chid = $3 WHERE guild_id = $1", + self.ctx.guild.id, + webhook_url, + channel.id, + ) + self.bot.update_log("voice", webhook_url, message.guild.id) + except commands.ChannelNotFound: + pass + except (commands.BadArgument, discord.Forbidden): + await self.ctx.send( + "Could not create a webhook in that channel!\n" "Do i have **Manage Webhooks** permissions there?" + ) + except discord.HTTPException: + await self.ctx.send("Something went wrong while creating a webhook...") + await self.update_message() + try: + await self.ctx.channel.delete_messages(to_delete) + except: + pass + + @discord.ui.button(style=discord.ButtonStyle.red, label="stop", row=2) + async def stop_button(self, interaction: discord.Interaction, button: discord.ui.Button): + if self.lock.locked(): + return await interaction.response.send_message("Can't do that while waiting for a message!", ephemeral=True) + await interaction.response.defer() + await self.on_timeout() + + async def on_timeout(self) -> None: + for child in self.children: + child.disabled = True + child.style = discord.ButtonStyle.grey + await self.message.edit(view=self) + self.stop() + + async def interaction_check(self, interaction: discord.Interaction) -> bool: + if interaction.user == self.ctx.author: + return True + await interaction.response.send_message(f"This menu belongs to **{self.ctx.author}**, sorry! πŸ’–", ephemeral=True) + return False + + async def update_message(self, edit: bool = True): + channels = await self.bot.pool.fetchrow("SELECT * FROM log_channels WHERE guild_id = $1", self.ctx.guild.id) + embed = discord.Embed( + title="Logging Channels", + colour=discord.Colour.blurple(), + timestamp=self.ctx.message.created_at, + ) + default = self.bot.get_channel(channels["default_chid"] or 1) + message = self.bot.get_channel(channels["message_chid"] or 1) + join_leave = self.bot.get_channel(channels["join_leave_chid"] or 1) + member = self.bot.get_channel(channels["member_chid"] or 1) + server = self.bot.get_channel(channels["server_chid"] or 1) + voice = self.bot.get_channel(channels["voice_chid"] or 1) + embed.description = ( + f"**β™Ύ Default channel:** {default.mention}" + f"\n**πŸ“¨ Message events:** {message.mention if message else ''}" + f"\n**πŸ‘‹ Joining and Leaving:** {join_leave.mention if join_leave else ''}" + f"\n**πŸ‘€ Member events:** {member.mention if member else ''}" + f"\n**βš™ Server events:** {server.mention if server else ''}" + f"\n**πŸŽ™ Voice events:** {voice.mention if voice else ''}" + f"\n" + f"\n_Channels not shown here will be_" + f"\n_delivered to the default channel._" + ) + loggings = self.bot.guild_loggings[self.ctx.guild.id] + enabled = [x for x, y in set(loggings) if y is True] + embed.set_footer(text=f"{len(enabled)}/{len(set(loggings))} events enabled.") + for child in self.children: + child.disabled = False + if child.row < 2: + child.style = discord.ButtonStyle.grey + else: + child.style = discord.ButtonStyle.red + if edit: + await self.message.edit(embed=embed, view=self) + else: + return await self.ctx.send(embed=embed, view=self) + + async def start(self): + self.message = await self.update_message(edit=False) + + +# noinspection PyProtocol +class ValidEventConverter(commands.Converter): + async def convert(self, ctx: CustomContext, argument: str): + new = argument.replace("-", "_").replace(" ", "_").lower() + all_events = dict(LoggingEventsFlags.all()) + if new in all_events: + return new + maybe_events = difflib.get_close_matches(argument, all_events) + if maybe_events: + c = await ctx.confirm( + f"Did you mean... **`{maybe_events[0]}`**?", + delete_after_confirm=True, + delete_after_timeout=False, + buttons=( + ("β˜‘", None, discord.ButtonStyle.blurple), + ("πŸ—‘", None, discord.ButtonStyle.gray), + ), + ) + if c: + return maybe_events[0] + elif c is None: + raise errors.NoHideout() + raise commands.BadArgument(f"`{argument[0:100]}` is not a valid logging event.") + + +styles = { + True: discord.ButtonStyle.green, + False: discord.ButtonStyle.gray, + None: discord.ButtonStyle.grey, +} + + +class EventToggle(discord.ui.Button["AllEvents"]): + def __init__(self, event: str, enabled: bool): + super().__init__( + label=event.title().replace("_", " ").replace("guild", "server"), + style=styles[enabled], + ) + self.emoji = CustomContext.default_tick(enabled) + self.event = event + self.enabled = enabled + + async def callback(self, interaction: discord.Interaction): + self.enabled = not self.enabled + q = await self.view.ctx.bot.pool.fetch( + f"UPDATE logging_events SET {self.event} = $1 WHERE guild_id = $2 RETURNING {self.event}", + self.enabled, + self.view.ctx.guild.id, + ) + if not q: + await interaction.response.send_message("For some reason logging is not set up anymore.") + self.view.stop() + return + self.style = styles[self.enabled] + self.emoji = self.view.ctx.bot.tick(self.enabled) + setattr( + self.view.ctx.bot.guild_loggings[self.view.ctx.guild.id], + self.event, + self.enabled, + ) + setattr(self.view.events, self.event, self.enabled) + await interaction.response.edit_message(embed=await self.view.async_update_event(), view=self.view) + + +class AllEvents(discord.ui.View): + def __init__(self, ctx: CustomContext, events: LoggingEventsFlags): + super().__init__() + self.ctx = ctx + self.events = events + + async def start(self): + self.prepare() + embed = await self.async_update_event() + await self.ctx.send(embed=embed, view=self) + + def prepare(self): + events = { + "message": "πŸ“¨", + "join_leave": "πŸ‘‹", + "member": "πŸ‘€", + "voice": "πŸŽ™", + "server": "βš™", + } + for event, emoji in events.items(): + self.select_category.options.append( + discord.SelectOption(label=f"{event.title()} events", value=event, emoji=emoji) + ) + option = "message" + options: typing.List[str, bool] = [o for o, v in getattr(LoggingEventsFlags, option)() if v is True] + opts = {k: v for k, v in self.events if k in options} + for option, value in opts.items(): + self.add_item(EventToggle(option, value)) + + def update_embed(self): + events = self.events + ctx = self.ctx + embed = discord.Embed( + title="Logging events for this server", + colour=discord.Colour.blurple(), + timestamp=ctx.message.created_at, + ) + message_events = [ + ctx.default_tick(events.message_delete, "Message Delete"), + ctx.default_tick(events.message_edit, "Message Edit"), + ctx.default_tick(events.message_purge, "Message Purge"), + ] + embed.add_field(name="Message Events", value="\n".join(message_events)) + join_leave_events = [ + ctx.default_tick(events.member_join, "Member Join"), + ctx.default_tick(events.member_leave, "Member Leave"), + ] + subtract = 0 + if not ctx.me.guild_permissions.manage_channels: + if events.invite_create: + join_leave_events.append("⚠ Invite Create" "\nβ•° Manage Channels") + subtract += 1 + else: + join_leave_events.append(ctx.default_tick(events.invite_create, "Invite Create")) + if events.invite_delete: + join_leave_events.append("⚠ Invite Delete" "\nβ•° Manage Channels") + subtract += 1 + else: + join_leave_events.append(ctx.default_tick(events.invite_delete, "Invite Create")) + else: + join_leave_events.append(ctx.default_tick(events.invite_create, "Invite Create")) + join_leave_events.append(ctx.default_tick(events.invite_delete, "Invite Delete")) + embed.add_field(name="Join Leave Events", value="\n".join(join_leave_events)) + member_update_evetns = [ + ctx.default_tick(events.member_update, "Member Update"), + ctx.default_tick(events.user_update, "User Update"), + ctx.default_tick(events.user_ban, "User Ban"), + ctx.default_tick(events.user_unban, "User Unban"), + ] + embed.add_field(name="Member Events", value="\n".join(member_update_evetns)) + voice_events = [ + ctx.default_tick(events.voice_join, "Voice Join"), + ctx.default_tick(events.voice_leave, "Voice Leave"), + ctx.default_tick(events.voice_move, "Voice Move"), + ctx.default_tick(events.voice_mod, "Voice Mod"), + ctx.default_tick(events.stage_open, "Stage Open"), + ctx.default_tick(events.stage_close, "Stage Close"), + ] + embed.add_field(name="Voice Events", value="\n".join(voice_events)) + server_events = [ + ctx.default_tick(events.channel_create, "Channel Create"), + ctx.default_tick(events.channel_delete, "Channel Delete"), + ctx.default_tick(events.channel_edit, "Channel Edit"), + ctx.default_tick(events.role_create, "Role Create"), + ctx.default_tick(events.role_delete, "Role Delete"), + ctx.default_tick(events.role_edit, "Role Edit"), + ctx.default_tick(events.server_update, "Server Update"), + ctx.default_tick(events.emoji_create, "Emoji Create"), + ctx.default_tick(events.emoji_delete, "Emoji Delete"), + ctx.default_tick(events.emoji_update, "Emoji Update"), + ctx.default_tick(events.sticker_create, "Sticker Create"), + ctx.default_tick(events.sticker_delete, "Sticker Delete"), + ctx.default_tick(events.sticker_update, "Sticker Update"), + ] + embed.add_field(name="Server Events", value="\n".join(server_events)) + embed.description = "βœ… Enabled β€’ ❌ Disabled β€’ ⚠ Missing Perms" + enabled = [x for x, y in set(events) if y is True] + amount_enabled = len(enabled) - subtract + embed.set_footer(text=f"{amount_enabled}/{len(set(events))} events enabled.") + return embed + + async def async_update_event(self): + return await self.ctx.bot.loop.run_in_executor(None, self.update_embed) + + @discord.ui.select(placeholder="Select an event category to view") + async def select_category(self, interaction: discord.Interaction, select: discord.ui.Select): + option = select.values[0] + options: typing.List[str, bool] = [o for o, v in getattr(LoggingEventsFlags, option)() if v is True] + opts = {k: v for k, v in self.events if k in options} # type: ignore + self.clear_items() + self.add_item(select) + self.add_item(self.delete) + for option, value in opts.items(): + self.add_item(EventToggle(option, value)) + await interaction.response.edit_message(embed=await self.async_update_event(), view=self) + + @discord.ui.button(label="Delete", style=discord.ButtonStyle.red, emoji="πŸ—‘", row=4) + async def delete(self, interaction: discord.Interaction, _): + await interaction.message.delete() + self.stop() + + async def interaction_check(self, interaction: discord.Interaction) -> bool: + return interaction.user == self.ctx.author and interaction.user.guild_permissions.manage_guild + + +class Logging(ConfigBase): + @commands.group(aliases=["logging", "logger"]) + @commands.max_concurrency(1, commands.BucketType.guild) + async def log(self, ctx: CustomContext): + """Base command to manage the logging events. + + Run this command without sub-commands to show more detailed information on the logging module""" + if ctx.invoked_subcommand is None: + embed = discord.Embed( + title="Korii Logging Module", + colour=discord.Colour.yellow(), + description="**What is this?**\n" + "The Logging module is a fully customizable logger for different server events. " + "It can be configured to log up to 30 unique events, and for those events to be " + "delivered into 5 different channels.\n" + "**Available commands:**\n" + f"\n`{ctx.clean_prefix}log enable ` Enables logging for this server." + f"\n`{ctx.clean_prefix}log disable` Disables logging for this server." + f"\n`{ctx.clean_prefix}log channels` Shows the current channel settings." + f"\n`{ctx.clean_prefix}log edit-channels` Modifies the log channels (interactive menu)." + f"\n`{ctx.clean_prefix}log all-events` Shows all events, disabled and enabled." + f"\n`{ctx.clean_prefix}log enable-event ` Enables a specific event from the list." + f"\n`{ctx.clean_prefix}log disable-event ` Disables a specific event from the list." + f"\n`{ctx.clean_prefix}log auto-setup` Creates a logging category with different channels." + f"\n" + f"\nFor more info on a specific command, run the `help` command with it, E.G:" + f"\n`sb!help log enable-event`", + ) + await ctx.send(embed=embed) + + @log.command( + name="enable", + aliases=["set-default"], + ) + @commands.has_permissions(manage_guild=True) + @commands.max_concurrency(1, commands.BucketType.guild) + async def log_enable(self, ctx: CustomContext, channel: discord.TextChannel): + """Enables the logging module to deliver to one channel. + + If logging is already enabled, it will set the default logging channel to the one specified. + _Note: This will not modify your enabled/disabled events, if any._""" + if ctx.guild.id in self.bot.log_channels: + raise commands.BadArgument("This server already has a logging enabled.") + if not channel.permissions_for(ctx.me).manage_webhooks and not channel.permissions_for(ctx.me).send_messages: + raise commands.BadArgument(f"I'm missing the Manage Webhooks permission in {channel.mention}") + await ctx.typing() + + try: + webhooks = await channel.webhooks() + except (discord.Forbidden, discord.HTTPException): + raise commands.BadArgument( + f"I was unable to get the list of webhooks in {channel.mention}. (Missing Permissions - Manage Webhooks)" + ) + for w in webhooks: + if w.user == self.bot.user: + webhook_url = w.url + break + else: + if len(webhooks) == 10: + raise commands.BadArgument(f"{channel.mention} has already the max number of webhooks! (10 webhooks)") + try: + w = await channel.create_webhook( + name="Korii logging", + avatar=await ctx.me.avatar.read(), + reason="Korii logging", + ) + webhook_url = w.url + except discord.Forbidden: + raise commands.BadArgument( + f"I couldn't create a webhook in {channel.mention}(Missing Permissions - Manage Webhooks)" + ) + except discord.HTTPException: + raise commands.BadArgument( + f"There was an unexpected error while creating a webhook in {channel.mention} (HTTP exception) - Perhaps try again?" + ) + await self.bot.pool.execute( + "INSERT INTO guilds (guild_id) VALUES ($1) " "ON CONFLICT (guild_id) DO NOTHING", + ctx.guild.id, + ) + await self.bot.pool.execute( + "INSERT INTO log_channels(guild_id, default_channel, default_chid) VALUES ($1, $2, $3) " + "ON CONFLICT (guild_id) DO UPDATE SET default_channel = $2, default_chid = $3", + ctx.guild.id, + webhook_url, + channel.id, + ) + await self.bot.pool.execute( + "INSERT INTO logging_events(guild_id) VALUES ($1) ON CONFLICT (guild_id) DO NOTHING", + ctx.guild.id, + ) + self.bot.guild_loggings[ctx.guild.id] = LoggingEventsFlags.all() + try: + self.bot.log_channels[ctx.guild.id]._replace(default=webhook_url) + except KeyError: + self.bot.log_channels[ctx.guild.id] = self.bot.log_webhooks( + default=webhook_url, + voice=None, + message=None, + member=None, + server=None, + join_leave=None, + ) + await ctx.send( + f"Successfully set the logging channel to {channel.mention}" + f"\n_see `{ctx.clean_prefix}help log` for more customization commands!_" + ) + + @log.command(name="disable", aliases=["disable-logging"]) + @commands.has_permissions(manage_guild=True) + @commands.max_concurrency(1, commands.BucketType.guild) + async def log_disable(self, ctx: CustomContext): + """Disables logging for this server, and deletes all the bots logging webhooks.""" + if ctx.guild.id not in self.bot.log_channels: + raise commands.BadArgument("Logging is not enabled for this server!") + confirm = await ctx.confirm( + "**Are you sure you want to disable logging?**" + "\nThis will overwrite and disable **all** delivery channels, and delete all my webhooks.", + delete_after_confirm=True, + delete_after_timeout=False, + ) + if not confirm: + return + async with ctx.typing(): + try: + self.bot.log_channels.pop(ctx.guild.id) + except KeyError: + pass + channels = await self.bot.pool.fetchrow("DELETE FROM log_channels WHERE guild_id = $1 RETURNING *", ctx.guild.id) + + channel_ids = ( + channels["default_chid"], + channels["message_chid"], + channels["join_leave_chid"], + channels["member_chid"], + channels["voice_chid"], + channels["server_chid"], + ) + failed = 0 + success = 0 + for channel in channel_ids: + channel = self.bot.get_channel(channel) + if isinstance(channel, discord.TextChannel): + try: + webhooks = await channel.webhooks() + for webhook in webhooks: + if webhook.user == ctx.me: + await webhook.delete() + success += 1 + except (discord.Forbidden, discord.HTTPException, discord.NotFound): + failed += 1 + await ctx.send( + "βœ… **Successfully unset all logging channels!**" + f"\n_Deleted {success} webhooks. {failed} failed to delete._" + ) + + @log.command(name="channels") + @commands.has_permissions(manage_guild=True) + async def log_channels(self, ctx: CustomContext): + """Shows this server's logging channels""" + if ctx.guild.id not in self.bot.log_channels: + raise commands.BadArgument("This server doesn't have logging enabled.") + channels = await self.bot.pool.fetchrow("SELECT * FROM log_channels WHERE guild_id = $1", ctx.guild.id) + embed = discord.Embed( + title="Logging Channels", + colour=discord.Colour.blurple(), + timestamp=discord.utils.utcnow(), + ) + default = self.bot.get_channel(channels["default_chid"] or 1) + message = self.bot.get_channel(channels["message_chid"] or 1) + join_leave = self.bot.get_channel(channels["join_leave_chid"] or 1) + member = self.bot.get_channel(channels["member_chid"] or 1) + server = self.bot.get_channel(channels["server_chid"] or 1) + voice = self.bot.get_channel(channels["voice_chid"] or 1) + embed.description = ( + f"**Default channel:** {default.mention}" + f"\n**Message events:** {message.mention if message else ''}" + f"\n**Joining and Leaving:** {join_leave.mention if join_leave else ''}" + f"\n**Member events:** {member.mention if member else ''}" + f"\n**Server events:** {server.mention if server else ''}" + f"\n**Voice events:** {voice.mention if voice else ''}" + f"\n" + f"\n_Channels not shown here will be_" + f"\n_delivered to the default channel._" + ) + loggings = self.bot.guild_loggings[ctx.guild.id] + enabled = [x for x, y in set(loggings) if y is True] + embed.set_footer(text=f"{len(enabled)}/{len(set(loggings))} events enabled.") + await ctx.send(embed=embed) + + @log.command(name="disable-event", aliases=["disable_event", "de"]) + @commands.has_permissions(manage_guild=True) + @commands.max_concurrency(1, commands.BucketType.guild) + async def log_disable_event(self, ctx, *, event: ValidEventConverter): + """**Disables a logging event, which can be one of the following:** + `message_delete`, `message_purge`, `message_edit`, `member_join`, `member_leave`, `member_update`, `user_ban`, `user_unban`, `user_update`, `invite_create`, `invite_delete`, `voice_join`, `voice_leave`, `voice_move`, `voice_mod`, `emoji_create`, `emoji_delete`, `emoji_update`, `sticker_create`, `sticker_delete`, `sticker_update`, `server_update`, `stage_open`, `stage_close`, `channel_create`, `channel_delete`, `channel_edit`, `role_create`, `role_delete`, `role_edit` + + You can either use underscore `_` or dash `-` when specifying the event. + _Note that the command will attempt to auto-complete to the closest match, if not specified._ + """ + if ctx.guild.id not in self.bot.log_channels: + raise commands.BadArgument("This server doesn't have logging enabled.") + arg = getattr(self.bot.guild_loggings[ctx.guild.id], event, None) + if arg is False: + raise commands.BadArgument(f'❌ **|** **{str(event).replace("_", " ").title()} Events** are already disabled!') + await self.bot.pool.execute( + f"UPDATE logging_events SET {event} = $2 WHERE guild_id = $1", + ctx.guild.id, + False, + ) + setattr(self.bot.guild_loggings[ctx.guild.id], event, False) + await ctx.send(f'βœ… **|** Successfully disabled **{str(event).replace("_", " ").title()} Events**') + + @log.command(name="enable-event", aliases=["enable_event", "ee"]) + @commands.has_permissions(manage_guild=True) + @commands.max_concurrency(1, commands.BucketType.guild) + async def log_enable_event(self, ctx: CustomContext, *, event: ValidEventConverter): + """**Enables a logging event, which can be one of the following:** + `message_delete`, `message_purge`, `message_edit`, `member_join`, `member_leave`, `member_update`, `user_ban`, `user_unban`, `user_update`, `invite_create`, `invite_delete`, `voice_join`, `voice_leave`, `voice_move`, `voice_mod`, `emoji_create`, `emoji_delete`, `emoji_update`, `sticker_create`, `sticker_delete`, `sticker_update`, `server_update`, `stage_open`, `stage_close`, `channel_create`, `channel_delete`, `channel_edit`, `role_create`, `role_delete`, `role_edit` + + You can either use underscore `_` or dash `-` when specifying the event. + _Note that the command will attempt to auto-complete to the closest match, if not specified._ + """ + if ctx.guild.id not in self.bot.log_channels: + raise commands.BadArgument("This server doesn't have logging enabled.") + arg = getattr(self.bot.guild_loggings[ctx.guild.id], event, None) + if arg is True: + raise commands.BadArgument(f'❌ **|** **{str(event).replace("_", " ").title()} Events** are already enabled!') + await self.bot.pool.execute( + f"UPDATE logging_events SET {event} = $2 WHERE guild_id = $1", + ctx.guild.id, + True, + ) + setattr(self.bot.guild_loggings[ctx.guild.id], event, True) + await ctx.send(f'βœ… **|** Successfully enabled **{str(event).replace("_", " ").title()} Events**') + + @log.command( + name="edit-channels", + aliases=["edit_channels", "ec"], + preview="https://i.imgur.com/FO9e9VC.gif", + ) + @commands.has_permissions(manage_guild=True) + @commands.max_concurrency(1, commands.BucketType.guild) + async def log_edit_channels(self, ctx): + """Shows an interactive menu to modify the server's logging channels.""" + if ctx.guild.id not in self.bot.log_channels: + raise commands.BadArgument("This server doesn't have logging enabled.") + view = ChannelsView(ctx) + await view.start() + await view.wait() + + @commands.max_concurrency(1, commands.BucketType.guild) + @log.command(name="events", aliases=["all-events", "ae"]) + @commands.has_permissions(manage_guild=True) + async def log_all_events(self, ctx: CustomContext): + if ctx.guild.id not in self.bot.log_channels: + raise commands.BadArgument("This server doesn't have logging enabled.") + await ctx.typing() + events = self.bot.guild_loggings[ctx.guild.id] + view = AllEvents(ctx, events) + await view.start() + await view.wait() + + @log.command(name="auto-setup") + @commands.has_permissions(administrator=True) + @commands.max_concurrency(1, commands.BucketType.guild) + @commands.bot_has_guild_permissions(manage_channels=True, manage_webhooks=True) + async def log_auto_setup(self, ctx: CustomContext): + """Creates a Logging category, with channels for each event to be delivered. + The channels would be the following (inside a logging category): + `#join-leave-log` + `#message-log` + `#voice-log` + `#member-log` + `#server-log` + """ + if ctx.guild in self.bot.log_channels: + raise commands.BadArgument("This server already has Logging Set up!") + c = await ctx.confirm( + "**Do you want to proceed?**" + "\nThis command will set up logging for you," + "\nBy creating the followinc category:" + "\n" + f"\n`#logging` (category)" + f"\n- `#join-leave-log`" + f"\n- `#message-log`" + f"\n- `#voice-log`" + f"\n- `#member-log`", + delete_after_timeout=False, + delete_after_cancel=False, + delete_after_confirm=True, + ) + if not c: + return + async with ctx.typing(): + try: + over = { + ctx.guild.default_role: discord.PermissionOverwrite(read_messages=False), + ctx.me: discord.PermissionOverwrite( + read_messages=True, + send_messages=True, + manage_channels=True, + manage_webhooks=True, + ), + } + avatar = await ctx.me.display_avatar.read() + cat = await ctx.guild.create_category(name="logging", overwrites=over) + join_leave_channel = await cat.create_text_channel(name="join-leave-log") + join_leave_webhook = await join_leave_channel.create_webhook(name="Korii logging", avatar=avatar) + message_channel = await cat.create_text_channel(name="message-log") + message_webhook = await message_channel.create_webhook(name="Korii logging", avatar=avatar) + voice_channel = await cat.create_text_channel(name="voice-log") + voice_webhook = await voice_channel.create_webhook(name="Korii logging", avatar=avatar) + member_channel = await cat.create_text_channel(name="member-log") + member_webhook = await member_channel.create_webhook(name="Korii logging", avatar=avatar) + server_channel = await cat.create_text_channel(name="server-log") + server_webhook = await server_channel.create_webhook(name="Korii logging", avatar=avatar) + self.bot.log_channels[ctx.guild.id] = self.bot.log_webhooks( + join_leave=join_leave_webhook.url, + server=server_webhook.url, + default=server_webhook.url, + message=message_webhook.url, + member=member_webhook.url, + voice=voice_webhook.url, + ) + self.bot.guild_loggings[ctx.guild.id] = LoggingEventsFlags.all() + await self.bot.pool.execute( + "INSERT INTO guilds (guild_id) VALUES ($1) " "ON CONFLICT (guild_id) DO NOTHING", + ctx.guild.id, + ) + await self.bot.pool.execute( + """ + INSERT INTO log_channels(guild_id, default_channel, default_chid, message_channel, message_chid, + join_leave_channel, join_leave_chid, member_channel, member_chid, + voice_channel, voice_chid, server_channel, server_chid) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) + ON CONFLICT (guild_id) DO UPDATE SET + default_channel = $2, default_chid = $3, message_channel = $4, message_chid = $5, + join_leave_channel = $6, join_leave_chid = $7, member_channel = $8, member_chid = $9, + voice_channel = $10, voice_chid = $11, server_channel = $12, server_chid = $13; """, + ctx.guild.id, + server_webhook.url, + server_channel.id, + message_webhook.url, + message_channel.id, + join_leave_webhook.url, + join_leave_channel.id, + member_webhook.url, + member_channel.id, + voice_webhook.url, + voice_channel.id, + server_webhook.url, + server_channel.id, + ) + await self.bot.pool.execute( + "INSERT INTO logging_events(guild_id) VALUES ($1)" "ON CONFLICT (guild_id) DO NOTHING", + ctx.guild.id, + ) + try: + embed = discord.Embed( + title="Successfully set up!", + colour=discord.Colour.blurple(), + description=f"{join_leave_channel.mention}" + f"\n{message_channel.mention}" + f"\n{voice_channel.mention}" + f"\n{server_channel.mention}", + ) + await ctx.send(embed=embed, mention_author=True) + except (discord.Forbidden, discord.HTTPException): + pass + except discord.Forbidden: + await ctx.send( + "For some reason, I didn't have the necessary permissions to do that." + "\nTry assigning me a role with `Administrator` permissions" + ) + except discord.HTTPException: + await ctx.send("Something went wrong, ups!") \ No newline at end of file diff --git a/extensions/logging/__init__.py b/extensions/logging/__init__.py new file mode 100644 index 0000000..4082821 --- /dev/null +++ b/extensions/logging/__init__.py @@ -0,0 +1,13 @@ +from .join_leave import JoinLeaveLogs +from .member import MemberLogs +from .message import MessageLogs +from .server import ServerLogs +from .voice import VoiceLogs + + +class Logging(JoinLeaveLogs, MemberLogs, MessageLogs, ServerLogs, VoiceLogs): + pass + + +async def setup(bot): + await bot.add_cog(Logging(bot)) diff --git a/extensions/logging/_base.py b/extensions/logging/_base.py new file mode 100644 index 0000000..1678c82 --- /dev/null +++ b/extensions/logging/_base.py @@ -0,0 +1,199 @@ +import asyncio +import logging +from collections import namedtuple + +import discord +import typing +from discord.ext import commands, tasks + +from bot import Korii + +guild_channels = typing.Union[ + discord.TextChannel, + discord.VoiceChannel, + discord.CategoryChannel, + discord.TextChannel, +] + +invalidated_webhook = ( + "https://canary.discord.com/api/webhooks/000000000000000000/_LQ1qItzrwhNj47TZEagmEgnjBJhCeLIIAE48M61S3XojN5bQuq8JM_kjv4cwCglYJlp" +) + + +class LoggingBase(commands.Cog): + def __init__(self, bot): + self.bot: Korii = bot + _nt_send_to = namedtuple("send_to", ["default", "message", "member", "join_leave", "voice", "server"]) + + self.send_to = _nt_send_to( + default="default", + message="message", + member="member", + join_leave="join_leave", + server="server", + voice="voice", + ) + self.send_lock = asyncio.Lock() + self.deliver_logs.start() + + def cog_unload(self) -> None: + self.deliver_logs.cancel() + + async def log( + self, + embed, + *, + guild: typing.Union[discord.Guild, discord.PartialInviteGuild, int], + send_to: str = "default", + ): + async with self.send_lock: + guild_id = getattr(guild, "id", guild) + if guild_id in self.bot.log_channels: + self.bot.log_cache[guild_id][send_to].append(embed) + + @tasks.loop(seconds=3) + async def deliver_logs(self): + async with self.send_lock: + try: + for guild_id, webhooks in self.bot.log_channels.items(): + for deliver_type in self.bot.log_cache[guild_id].keys(): + embeds = self.bot.log_cache[guild_id][deliver_type][:10] + self.bot.log_cache[guild_id][deliver_type] = self.bot.log_cache[guild_id][deliver_type][10:] + webhook_url = getattr(webhooks, deliver_type, None) + if embeds: + if webhook_url: + webhook = discord.Webhook.from_url( + webhook_url or invalidated_webhook, + bot_token=self.bot.http.token, + session=self.bot.session, + ) + + try: + await webhook.send(embeds=embeds) + + except discord.NotFound: + self.bot.loop.create_task( + self.create_and_deliver( + embeds=embeds, + deliver_type=deliver_type, + guild_id=guild_id, + ) + ) + await asyncio.sleep(1) + + except Exception as e: + print("Error during task!") + print(e) + + else: + deliver_type = self.send_to.default + webhook_url = webhooks.default + webhook = discord.Webhook.from_url( + webhook_url or invalidated_webhook, + bot_token=self.bot.http.token, + session=self.bot.session, + ) + try: + await webhook.send(embeds=embeds) + + except discord.NotFound: + self.bot.loop.create_task( + self.create_and_deliver( + embeds=embeds, + deliver_type=deliver_type, + guild_id=guild_id, + ) + ) + await asyncio.sleep(1) + + except Exception as e: + print("Error during task!") + print(e) + + except Exception as e: + if isinstance(e, RuntimeError) and str(e) == str(RuntimeError("dictionary changed size during iteration")): + return + try: + await self.bot.on_error("channel_logs") + except Exception as e: + logging.error("something happened while task was running", exc_info=e) + + @deliver_logs.before_loop + async def wait(self): + await self.bot.wait_until_ready() + + async def create_and_deliver(self, embeds: typing.List[discord.Embed], deliver_type: str, guild_id: int): + if deliver_type not in [ + "default", + "message", + "member", + "join_leave", + "voice", + "server", + ]: + raise AttributeError("Improper delivery type passed") + + channel_ids = await self.bot.pool.fetchrow(f"SELECT * FROM log_channels WHERE guild_id = $1", guild_id) + if not channel_ids: + return + + channel_id: int = channel_ids[f"{deliver_type}_chid"] + channel: discord.TextChannel = self.bot.get_channel(channel_id) # type: ignore + + if not channel and deliver_type != self.send_to.default: + for e in embeds: + e.footer.text = ( + e.footer.text + or "" + f"\nCould not deliver to the {deliver_type} channel. Sent here instead!\n" + f"Please set or set the {deliver_type} channel. do `db.help log` for info." + ) + await self.log(e, guild=guild_id, send_to=self.send_to.default) + return + + if not channel: + return + + if channel.permissions_for(channel.guild.me).manage_webhooks: + webhooks_list = await channel.webhooks() + for w in webhooks_list: + if w.user == self.bot.user: + webhook = w + break + + else: + webhook = await channel.create_webhook( + name="Korii Logging", + avatar=await self.bot.user.display_avatar.read(), + reason="Korii Logging channel", + ) + + await self.bot.pool.execute(f"UPDATE log_channels SET {deliver_type}_channel = $1", webhook.url) + if deliver_type == "default": + self.bot.log_channels[channel.guild.id]._replace(default=webhook.url) + elif deliver_type == "message": + self.bot.log_channels[channel.guild.id]._replace(message=webhook.url) + elif deliver_type == "member": + self.bot.log_channels[channel.guild.id]._replace(member=webhook.url) + elif deliver_type == "join_leave": + self.bot.log_channels[channel.guild.id]._replace(join_leave=webhook.url) + elif deliver_type == "voice": + self.bot.log_channels[channel.guild.id]._replace(voice=webhook.url) + elif deliver_type == "server": + self.bot.log_channels[channel.guild.id]._replace(server=webhook.url) + await webhook.send(embeds=embeds) + + elif not deliver_type != self.send_to.default: + for e in embeds: + e.footer.text = ( + e.footer.text + or "" + f"\nCould not deliver to the {deliver_type} channel. Sent here instead!\n" + f"Please give me **Manage Webhooks** permissions in #{channel.name}." + ) + await self.log(e, guild=guild_id, send_to=self.send_to.default) + + else: + await channel.send( + f"An error occurred delivering the message to {channel.mention}!" + f"\nPlease check if I have the **Manage Webhooks** permissions in all the log channels!" + f"\nAnd also check that {channel.mention} has less than 10 webhooks, **or** it already has one webhook owned by {channel.guild.me.mention}" + ) diff --git a/extensions/logging/join_leave.py b/extensions/logging/join_leave.py new file mode 100644 index 0000000..45c4039 --- /dev/null +++ b/extensions/logging/join_leave.py @@ -0,0 +1,84 @@ +import discord +import typing +from discord.ext import commands + +from ._base import LoggingBase + +class JoinLeaveLogs(LoggingBase): + @commands.Cog.listener("on_invite_update") + async def logging_on_invite_update(self, member: discord.Member, invite: typing.Optional[discord.Invite]): + if member.guild.id not in self.bot.log_channels or not self.bot.guild_loggings[member.guild.id].member_join: + return + + embed = discord.Embed(title="Member joined", colour=discord.Colour.green(), timestamp=discord.utils.utcnow(), + description=f"{member.mention} | {member.guild.member_count} to join." + f"\n**Created:** {discord.utils.format_dt(member.created_at)} ({discord.utils.format_dt(member.created_at, style='R')})") + + embed.set_author(name=str(member), icon_url=member.display_avatar.url) + + if invite: + embed.add_field(name="Invited by:", + value=f"{discord.utils.escape_markdown(str(invite.inviter))} ({invite.inviter.mention if invite.inviter else 'Unknown'})" + f"\n**Using invite code:** [{invite.code}]({invite.url})" + f"\n**Expires:** {discord.utils.format_dt(invite.expires_at) if invite.expires_at else 'Never'}" + f"\n**Uses:** {invite.uses}/{invite.max_uses if invite.max_uses > 0 else 'unlimited'}", inline=False) + + await self.log(embed, guild=member.guild, send_to=self.send_to.join_leave) + + @commands.Cog.listener("on_member_remove") + async def logging_on_member_remove(self, member: discord.Member): + if member.guild.id not in self.bot.log_channels or not self.bot.guild_loggings[member.guild.id].member_leave: + return + + embed = discord.Embed(color=discord.Colour(0xF4D58C), title="Member left", + description=f"**Created at:** {discord.utils.format_dt(member.created_at)} ({discord.utils.format_dt(member.created_at, 'R')})" + f"\n**Joined at:** {discord.utils.format_dt(member.joined_at) if member.joined_at else 'N/A'}" + f"({discord.utils.format_dt(member.joined_at, 'R') if member.joined_at else'"N/A'})" + f"\n**Nickname:** {member.nick}") + + embed.set_author(name=str(member), icon_url=(member.avatar or member.default_avatar).url) + roles = [r for r in member.roles if not r.is_default()] + + if roles: + embed.add_field(name="Roles", value=", ".join([r.mention for r in roles]), inline=True) + + await self.log(embed, guild=member.guild, send_to=self.send_to.join_leave) + + @commands.Cog.listener("on_invite_create") + async def logging_on_invite_create(self, invite: discord.Invite): + assert invite.guild + + if invite.guild.id not in self.bot.log_channels or not self.bot.guild_loggings[invite.guild.id].invite_create: + return + + embed = discord.Embed(title="Invite Created", colour=discord.Colour.fuchsia(), timestamp=discord.utils.utcnow(), + description=f"**Inviter:** {invite.inviter}{f' ({invite.inviter.id})' if invite.inviter else ''}\n" + f"**Invite Code:** [{invite.code}]({invite.url})\n" + f"**Expires:** {discord.utils.format_dt(invite.expires_at, style='R') if invite.expires_at else 'Never'}\n" + f"**Max Uses:** {invite.max_uses if invite.max_uses > 0 else 'Unlimited'}\n" + f"**Channel:** {invite.channel}\n" + f"**Grants Temporary Membership:** {self.bot.tick(invite.temporary)}") + + if invite.inviter: + embed.set_author(icon_url=invite.inviter.display_avatar.url, name=str(invite.inviter)) + + embed.set_footer(text=f"Invite ID: {invite.id}") + + await self.log(embed, guild=invite.guild.id, send_to=self.send_to.join_leave) + + @commands.Cog.listener("on_invite_delete") + async def logging_on_invite_delete(self, invite: discord.Invite): + assert invite.guild + + if invite.guild.id not in self.bot.log_channels or not self.bot.guild_loggings[invite.guild.id].invite_delete: + return + + embed = discord.Embed(title="Invite Deleted", colour=discord.Colour.fuchsia(), timestamp=discord.utils.utcnow(), + description=f"**Inviter:** {invite.inviter}{f' ({invite.inviter.id})' if invite.inviter else ''}\n" + f"**Invite Code:** [{invite.code}]({invite.url})\n" + f"**Channel:** {invite.channel}\n" + f"**Grants Temporary Membership:** {self.bot.tick(invite.temporary)}") + if invite.inviter: + embed.set_author(icon_url=invite.inviter.display_avatar.url, name=str(invite.inviter)) + embed.set_footer(text=f"Invite ID: {invite.id}") + await self.log(embed, guild=invite.guild.id, send_to=self.send_to.join_leave) \ No newline at end of file diff --git a/extensions/logging/member.py b/extensions/logging/member.py new file mode 100644 index 0000000..59fe4bd --- /dev/null +++ b/extensions/logging/member.py @@ -0,0 +1,156 @@ +import asyncio + +import discord +from discord.ext import commands + +from ._base import LoggingBase + + +class MemberLogs(LoggingBase): + @commands.Cog.listener("on_member_update") + async def logging_on_member_update(self, before: discord.Member, after: discord.Member): + if before.guild.id not in self.bot.log_channels or not self.bot.guild_loggings[after.guild.id].member_update: + return + + await asyncio.sleep(1) + + embed = discord.Embed(title="Member Updated", colour=discord.Colour.blurple(), timestamp=discord.utils.utcnow()) + embed.set_author(name=str(after), icon_url=after.display_avatar.url) + embed.set_footer(text=f"User ID: {after.id}") + + deliver = False + + if before.avatar != after.avatar: + if after.avatar is not None: + embed.add_field( + name="Server Avatar updated:", + inline=False, + value=f"Member {'updated' if before.avatar else 'set'} their avatar.", + ) + embed.set_thumbnail(url=after.guild_avatar.url) # type: ignore + + else: + embed.add_field(name="Server Avatar updated:", inline=False, value="Member removed their avatar.") + embed.set_thumbnail(url=after.default_avatar.url) + + deliver = True + + if before.roles != after.roles: + added = set(after.roles) - set(before.roles) + removed = set(before.roles) - set(after.roles) + add = False + + if added: + added = f"**Added:**" + ", ".join([r.mention for r in added]) + add = True + + else: + added = "" + + if removed: + removed = f"**Removed:**" + ", ".join([r.mention for r in removed]) + add = True + + else: + removed = "" + + if add: + embed.add_field(name="Roles updated:", inline=False, value=f"{added}\n{removed}") + + deliver = True + + if before.nick != after.nick: + embed.add_field( + name="Nickname updated:", + inline=False, + value=f"**Before:** {discord.utils.escape_markdown(str(before.nick))}" + f"\n**After:** {discord.utils.escape_markdown(str(after.nick))}", + ) + deliver = True + + if deliver: + await self.log(embed, guild=after.guild, send_to=self.send_to.member) + + @commands.Cog.listener("on_user_update") + async def logging_on_user_update(self, before: discord.User, after: discord.User): + if after.id == self.bot.user.id: + return + + guilds = [g.id for g in before.mutual_guilds if g.id in self.bot.log_channels] + + if not guilds: + return + + deliver = False + embed = discord.Embed(title="User Updated", colour=discord.Colour.blurple(), timestamp=discord.utils.utcnow()) + embed.set_author(name=str(after), icon_url=after.display_avatar.url) + embed.set_footer(text=f"User ID: {after.id}") + + if before.avatar != after.avatar: + if after.avatar is not None: + embed.add_field( + name="Avatar updated:", + inline=False, + value=f"Member {'updated' if before.avatar else 'set'} their avatar.", + ) + embed.set_thumbnail(url=after.display_avatar.url) + + else: + embed.add_field(name="Avatar updated:", inline=False, value="Member removed their avatar.") + embed.set_thumbnail(url=after.default_avatar.url) + + deliver = True + + if before.name != after.name: + embed.add_field( + name="Changed Names:", + inline=False, + value=f"**Before:** {discord.utils.escape_markdown(before.name)}\n" + f"**After:** {discord.utils.escape_markdown(after.name)}", + ) + deliver = True + + if before.discriminator != after.discriminator: + embed.add_field( + name="Changed Discriminator:", + inline=False, + value=f"**Before:** {before.discriminator}\n**After:** {after.discriminator}", + ) + deliver = True + + if deliver: + for g in guilds: + if self.bot.guild_loggings[g].member_update: + await self.log(embed, guild=g, send_to=self.send_to.member) + + @commands.Cog.listener("on_member_ban") + async def logging_on_member_ban(self, guild: discord.Guild, user: discord.User): + if guild.id not in self.bot.log_channels or not self.bot.guild_loggings[guild.id].user_ban: + return + + embed = discord.Embed( + title="User Banned", + colour=discord.Colour.red(), + timestamp=discord.utils.utcnow(), + description=f"**Account Created:** {discord.utils.format_dt(user.created_at)} ({discord.utils.format_dt(user.created_at, style='R')})", + ) + + embed.set_author(name=str(user), icon_url=user.display_avatar.url) + embed.set_footer(text=f"User ID: {user.id}") + await self.log(embed, guild=guild, send_to=self.send_to.member) + + @commands.Cog.listener("on_member_unban") + async def logging_on_member_unban(self, guild: discord.Guild, user: discord.User): + if guild.id not in self.bot.log_channels or not self.bot.guild_loggings[guild.id].user_unban: + return + + embed = discord.Embed( + title="User Unbanned", + colour=discord.Colour.blurple(), + timestamp=discord.utils.utcnow(), + description=f"**Account Created:** {discord.utils.format_dt(user.created_at)} ({discord.utils.format_dt(user.created_at, style='R')})", + ) + + embed.set_author(name=str(user), icon_url=user.display_avatar.url) + embed.set_footer(text=f"User ID: {user.id}") + await self.log(embed, guild=guild, send_to=self.send_to.member) \ No newline at end of file diff --git a/extensions/logging/message.py b/extensions/logging/message.py new file mode 100644 index 0000000..501931a --- /dev/null +++ b/extensions/logging/message.py @@ -0,0 +1,116 @@ +import discord +from discord.ext import commands +from ._base import LoggingBase + + +class MessageLogs(LoggingBase): + @commands.Cog.listener("on_message_delete") + async def logger_on_message_delete(self, message: discord.Message) -> None: + if ( + message.author.bot + or not message.guild + or message.guild.id not in self.bot.log_channels + or not self.bot.guild_loggings[message.guild.id].message_delete + ): + return + + if message.guild.id in self.bot.log_channels: + embed = discord.Embed( + title=f"Message deleted in #{message.channel}", + description=(message.content or "\u200b")[0:4000], + colour=discord.Colour.red(), + timestamp=discord.utils.utcnow(), + ) + + embed.set_author(name=str(message.author), icon_url=message.author.display_avatar.url) + embed.set_footer(text=f"Channel: {message.channel.id}") + + if message.attachments: + embed.add_field(name="Attachments:", value="\n".join([a.filename for a in message.attachments]), inline=False) + + if message.stickers: + embed.add_field(name="Stickers:", value="\n".join([a.name for a in message.stickers]), inline=False) + + await self.log(embed, guild=message.guild, send_to=self.send_to.message) + + @commands.Cog.listener("on_raw_bulk_message_delete") + async def logging_on_raw_bulk_message_delete(self, payload: discord.RawBulkMessageDeleteEvent): + if not payload.guild_id or payload.guild_id not in self.bot.log_channels or not self.bot.guild_loggings[payload.guild_id].message_purge: + return + + embed = discord.Embed( + title=f"{len(payload.message_ids)} messages purged in #{self.bot.get_channel(payload.channel_id)}", + colour=discord.Colour.red(), + timestamp=discord.utils.utcnow(), + ) + + msgs = [] + + for message in payload.cached_messages: + if message.author.bot: + continue + + if message.attachments: + attachment = f"{len(message.attachments)} attachments: " + message.attachments[0].filename + + elif message.stickers: + attachment = "Sticker: " + message.stickers[0].name + + else: + attachment = None + + message = f"{discord.utils.remove_markdown(str(message.author))} > {message.content or attachment or '-'}" + + if len(message) > 200: + message = message[0:200] + "..." + + msgs.append(message) + if len("\n".join(msgs)[0:4096]) > 4000: + break + + embed.description = "\n".join(msgs)[0:4000] + embed.add_field(name="Showing: ", value=f"{len(msgs)}/{len(payload.message_ids)} messages.", inline=False) + embed.set_footer(text=f"Channel: {payload.channel_id}") + await self.log(embed, guild=payload.guild_id, send_to=self.send_to.message) + + @commands.Cog.listener("on_message_edit") + async def logger_on_message_edit(self, before: discord.Message, after: discord.Message): + if ( + not after.guild + or before.author.bot + or not before.guild + or before.guild.id not in self.bot.log_channels + or not self.bot.guild_loggings[after.guild.id].message_edit + ): + return + + if not self.bot.guild_loggings[before.guild.id].message_edit: + return + + if before.guild.id in self.bot.log_channels: + if before.content == after.content and before.attachments == after.attachments and before.stickers == after.stickers: + return + + embed = discord.Embed( + title=f"Message edited in #{before.channel}", + colour=discord.Colour.blurple(), + timestamp=discord.utils.utcnow(), + ) + embed.set_author(name=str(before.author), icon_url=before.author.display_avatar.url) + embed.set_footer(text=f"Channel: {before.channel.id}") + + embed.add_field(name="**__Before:__**", value=before.content[0:1024], inline=False) + embed.add_field(name="**__After:__**", value=after.content[0:1024], inline=False) + + if before.attachments and before.attachments != after.attachments: + af = after.attachments + attachments = [] + for a in before.attachments: + if a in af: + attachments.append(a.filename) + else: + attachments.append(f"[Removed] ~~{a.filename}~~") + embed.add_field(name="Attachments:", value="\n".join(attachments), inline=False) + + embed.add_field(name="Jump:", value=f"[[Jump to message]]({after.jump_url})", inline=False) + await self.log(embed, guild=before.guild, send_to=self.send_to.message) diff --git a/extensions/logging/server.py b/extensions/logging/server.py new file mode 100644 index 0000000..538b0ef --- /dev/null +++ b/extensions/logging/server.py @@ -0,0 +1,468 @@ +import discord + +import typing +from discord.ext import commands + +from ._base import LoggingBase, guild_channels + + +TOPIC_CHANNELS = typing.Union[discord.TextChannel, discord.ForumChannel, discord.StageChannel, discord.StageInstance] + + +class ServerLogs(LoggingBase): + @commands.Cog.listener("on_guild_channel_delete") + async def logger_on_guild_channel_delete(self, channel: guild_channels): + if ( + not channel.guild + or channel.guild.id not in self.bot.log_channels + or not self.bot.guild_loggings[channel.guild.id].channel_delete + ): + return + + embed = discord.Embed( + title=f"{channel.type} channel deleted.".title(), + description=f"**Name:** #{channel.name}" + f"\n**Category:** {channel.category}" + f"\n**Topic:** {discord.utils.remove_markdown(channel.topic or '') if isinstance(channel, TOPIC_CHANNELS) else 'None'}" + f"\n**Created at:** {discord.utils.format_dt(channel.created_at)}", + colour=discord.Colour.red(), + timestamp=discord.utils.utcnow(), + ) + + embed.set_footer(text=f"Channel ID: {channel.id}") + await self.log(embed, guild=channel.guild, send_to=self.send_to.server) + + @commands.Cog.listener("on_guild_channel_create") + async def logger_on_guild_channel_create(self, channel: guild_channels): + if channel.guild.id not in self.bot.log_channels: + return + + embed = discord.Embed( + title=f"{channel.type} channel Created".title(), + description=f"**Name:** #{channel.name}" + f"\n**Category:** {channel.category}" + f"\n**Topic:** {discord.utils.remove_markdown(channel.topic or '') if isinstance(channel, TOPIC_CHANNELS) else 'None'}", + colour=discord.Colour.green(), + timestamp=discord.utils.utcnow(), + ) + + for target, over in channel.overwrites.items(): + perms = [] + for perm, value in dict(over).items(): + if value is not None: + perms.append( + f"{str(perm).replace('guild', 'server').replace('_', ' ').title()} {self.bot.tick(value)}" + ) + + if perms: + embed.add_field(name=f"Permissions for {target}", value="\n".join(perms), inline=False) + + embed.set_footer(text=f"Channel ID: {channel.id}") + await self.log(embed, guild=channel.guild, send_to=self.send_to.server) + + @commands.Cog.listener("on_guild_channel_update") + async def logger_on_guild_channel_update(self, before: guild_channels, after: guild_channels): + if before.guild.id not in self.bot.log_channels or not self.bot.guild_loggings[after.guild.id].channel_delete: + return + + deliver = False + embed = discord.Embed( + title=f"{before.type} channel updated".title(), + description=f"**Name:** #{after.name}\n**Category:** {getattr(after.category, 'name', 'no category')}", + colour=discord.Colour.blurple(), + timestamp=discord.utils.utcnow(), + ) + + embed.set_footer(text=f"Channel ID: {after.id}") + if before.name != after.name: + embed.add_field( + name="Name updated:", + value=f"**From:** {discord.utils.escape_markdown(before.name)}" + f"\n**To:** {discord.utils.escape_markdown(after.name)}", + inline=False, + ) + deliver = True + + if hasattr(before, "topic") and hasattr(after, "topic") and isinstance(before, TOPIC_CHANNELS) and isinstance(after, TOPIC_CHANNELS) and before.topic != after.topic: + embed.add_field( + name="Topic updated", + value=f"**Before:** {discord.utils.remove_markdown(str(before.topic))}" + f"\n**After:** {discord.utils.remove_markdown(str(after.topic))}", + inline=False, + ) + deliver = True + + if before.overwrites != after.overwrites: + targets = set.union(set(before.overwrites.keys()), set(after.overwrites.keys())) + + for target in targets: + updated_perms = [] + b_o = dict(before.overwrites_for(target)) + a_o = dict(after.overwrites_for(target)) + for perm, value in b_o.items(): + if value != a_o[perm]: + updated_perms.append( + f"{str(perm).replace('server', 'guild').replace('_', ' ').title()}: {self.bot.tick(value)} ➜ {self.bot.tick(a_o[perm])}" + ) + + if updated_perms: + perm_emb = discord.Embed( + title=f"Permissions for {target} in {after} updated", + colour=discord.Colour.blurple(), + timestamp=discord.utils.utcnow(), + description="\n".join(updated_perms), + ) + perm_emb.set_footer(text=f"Object ID: {target.id}\nChannel ID: {after.id}") + await self.log(perm_emb, guild=after.guild, send_to=self.send_to.server) + + if deliver: + await self.log(embed, guild=after.guild, send_to=self.send_to.server) + + @commands.Cog.listener("on_guild_update") + async def logger_on_guild_update(self, before: discord.Guild, after: discord.Guild): + if before.id not in self.bot.log_channels or not self.bot.guild_loggings[after.id].server_update: + return + if before.icon != after.icon: + if after.icon and before.icon: + embed = discord.Embed( + title="Server icon updated to:", timestamp=discord.utils.utcnow(), colour=discord.Colour.blue() + ) + embed.set_footer(text=f"Server ID: {after.id}") + embed.set_image(url=after.icon.url) + + elif after.icon and not before.icon: + embed = discord.Embed( + title="Server icon set:", timestamp=discord.utils.utcnow(), colour=discord.Colour.green() + ) + embed.set_footer(text=f"Server ID: {after.id}") + embed.set_image(url=after.icon.url) + + else: + embed = discord.Embed( + title="Server icon removed", timestamp=discord.utils.utcnow(), colour=discord.Colour.red() + ) + embed.set_footer(text=f"Server ID: {after.id}") + + await self.log(embed, guild=after, send_to=self.send_to.server) + + if before.name != after.name: + embed = discord.Embed( + title="Server Name Updated", + timestamp=discord.utils.utcnow(), + colour=discord.Colour.blurple(), + description=f"**Before:** {discord.utils.remove_markdown(before.name)}\n" + f"**After:** {discord.utils.escape_markdown(after.name)}", + ) + embed.set_footer(text=f"Server id: {after.id}") + await self.log(embed, guild=after, send_to=self.send_to.server) + + if before.owner != after.owner: + embed = discord.Embed( + title="Server Owner Updated!", + colour=discord.Colour.purple(), + timestamp=discord.utils.utcnow(), + description=f"**From:** {discord.utils.escape_markdown(str(before.owner))}\n" + f"**To:** {discord.utils.escape_markdown(str(after.owner))}", + ) + + await self.log(embed, guild=after, send_to=self.send_to.server) + + @commands.Cog.listener("on_guild_role_create") + async def logger_on_guild_role_create(self, role: discord.Role): + if role.guild.id not in self.bot.log_channels or not self.bot.guild_loggings[role.guild.id].role_create: + return + embed = discord.Embed( + title="New Role Created", + timestamp=discord.utils.utcnow(), + colour=discord.Colour.green(), + description=f"**Name:** {role.name}\n" + f"**Show Separately:** {self.bot.tick(role.hoist)} β€’ **Color:** {role.color}\n" + f"**Mentionable:** {self.bot.tick(role.mentionable)} β€’ **Position:** {role.position}\n", + ) + enabled = ", ".join( + [ + str(name).replace("guild", "server").replace("_", " ").title() + for name, value in set(role.permissions) + if value is True + ] + ) + embed.add_field(name="Permissions enabled:", value=enabled, inline=False) + embed.set_footer(text=f"Role ID: {role.id}") + await self.log(embed, guild=role.guild, send_to=self.send_to.server) + + @commands.Cog.listener("on_guild_role_delete") + async def logger_on_guild_role_delete(self, role: discord.Role): + if role.guild.id not in self.bot.log_channels or not self.bot.guild_loggings[role.guild.id].role_delete: + return + + embed = discord.Embed( + title="Role Deleted", + timestamp=discord.utils.utcnow(), + colour=discord.Colour.red(), + description=f"**Name:** {role.name}\n" + f"**Show Separately:** {self.bot.tick(role.hoist)} β€’ **Color:** {role.color}\n" + f"**Mentionable:** {self.bot.tick(role.mentionable)} β€’ **Position:** {role.position}\n" + f"**Created At:** {discord.utils.format_dt(role.created_at)} ({discord.utils.format_dt(role.created_at, style='R')})\n" + f"**Amount of Members:** {len(role.members)}", + ) + + enabled = ", ".join( + [ + str(name).replace("guild", "server").replace("_", " ").title() + for name, value in set(role.permissions) + if value is True + ] + ) + + embed.add_field(name="Permissions enabled:", value=enabled, inline=False) + embed.set_footer(text=f"Role ID: {role.id}") + await self.log(embed, guild=role.guild, send_to=self.send_to.server) + + @commands.Cog.listener("on_guild_role_update") + async def logger_on_guild_role_update(self, before: discord.Role, after: discord.Role): + if before.guild.id not in self.bot.log_channels or not self.bot.guild_loggings[after.guild.id].role_edit: + return + embed = discord.Embed(title="Role Updated", timestamp=discord.utils.utcnow(), colour=discord.Colour.blurple()) + deliver = False + + if before.permissions != after.permissions: + before_true = [ + str(name).replace("guild", "server").replace("_", " ").title() + for name, value in set(before.permissions) + if value is True + ] + + before_false = [ + str(name).replace("guild", "server").replace("_", " ").title() + for name, value in set(before.permissions) + if value is False + ] + + after_true = [ + str(name).replace("guild", "server").replace("_", " ").title() + for name, value in set(after.permissions) + if value is True + ] + + after_false = [ + str(name).replace("guild", "server").replace("_", " ").title() + for name, value in set(after.permissions) + if value is False + ] + + added = "" + if before_true != after_true: + added = set(after_true) - set(before_true) + if added: + added = f"**Added:** {', '.join(added)}\n" + + else: + added = "" + + removed = "" + if after_false != before_false: + removed = set(after_false) - set(before_false) + if removed: + removed = f"**Removed:** {', '.join(removed)}" + + else: + removed = "" + + embed.add_field(name="Permissions Updated:", value=added + removed, inline=False) + deliver = True + + hoist_update = "" + if before.hoist != after.hoist: + hoist_update = ( + f"\n**Show Separately:** {self.bot.tick(before.hoist)} ➜ {self.bot.tick(after.hoist)}" + ) + deliver = True + + ping_update = "" + if before.mentionable != after.mentionable: + ping_update = f"\n**Mentionable:** {self.bot.tick(before.mentionable)} ➜ {self.bot.tick(after.mentionable)}" + deliver = True + + role_update = f"**Name:** {after.name}" + if before.name != after.name: + role_update = ( + f"**Name:**\n**Before:** {discord.utils.remove_markdown(before.name)}" + f"\n**After:** {discord.utils.remove_markdown(after.name)}" + ) + deliver = True + + color_update = "" + if before.color != after.color: + color_update = f"\n**Updated Color:** `{before.color}` ➜ `{after.color}`" + deliver = True + + position_update = "" + if before.position != after.position: + position_update = f"\n**Updated Position:** `{before.position}` ➜ `{after.position}`" + + + embed.description = role_update + hoist_update + ping_update + color_update + position_update + if deliver: + await self.log(embed, guild=after.guild, send_to=self.send_to.server) + + @commands.Cog.listener("on_guild_emojis_update") + async def logger_on_guild_emojis_update( + self, guild: discord.Guild, before: typing.Sequence[discord.Emoji], after: typing.Sequence[discord.Emoji] + ): + if guild.id not in self.bot.log_channels: + return + + added = [e for e in after if e not in before] + removed = [e for e in before if e not in after] + + for emoji in added: + if not self.bot.guild_loggings[guild.id].emoji_create: + break + + embed = discord.Embed( + title="Emoji Created", + colour=discord.Colour.green(), + timestamp=discord.utils.utcnow(), + description=f"{emoji} - [{emoji.name}]({emoji.url})", + ) + embed.set_footer(text=f"Emoji ID: {emoji.id}") + await self.log(embed, guild=guild, send_to=self.send_to.server) + + for emoji in removed: + if not self.bot.guild_loggings[guild.id].emoji_delete: + break + + embed = discord.Embed( + title="Emoji Deleted", + colour=discord.Colour.red(), + timestamp=discord.utils.utcnow(), + description=f"{emoji.name}" + f"\n**Created at:** {discord.utils.format_dt(emoji.created_at)} ({discord.utils.format_dt(emoji.created_at, style='R')})", + ) + embed.set_footer(text=f"Emoji ID: {emoji.id}") + await self.log(embed, guild=guild, send_to=self.send_to.server) + + existant = set.union(set(after) - set(added), set(before) - set(removed)) + + for emoji in existant: + if not self.bot.guild_loggings[guild.id].emoji_update: + break + + before_emoji = discord.utils.get(before, id=emoji.id) + after_emoji = emoji + + if before_emoji and after_emoji: + await self.emoji_update(guild, before_emoji, after_emoji) + + async def emoji_update(self, guild: discord.Guild, before: discord.Emoji, after: discord.Emoji): + if before.name == after.name and before.roles == after.roles: + return + + if not self.bot.guild_loggings[guild.id].emoji_update: + return + + embed = discord.Embed( + title="Emoji Updated", + colour=discord.Colour.blurple(), + timestamp=discord.utils.utcnow(), + description=f"{str(after)} | [{after.name}]({after.url})", + ) + + embed.set_footer(text=f"Emoji ID: {after.id}") + + if before.name != after.name: + embed.add_field(name="Name updated:", inline=False, value=f"**Before:** {before.name}\n**After:** {after.name}") + + if before.roles != after.roles: + added = set(after.roles) - set(before.roles) + removed = set(before.roles) - set(after.roles) + added_roles = "" + if added: + added_roles = f"**Added:** {', '.join([r.mention for r in added])}" + removed_roles = "" + + if removed: + removed_roles = f"\n**Removed:** {', '.join([r.mention for r in removed])}" + embed.add_field(name="Roles updated", inline=False, value=added_roles + removed_roles) + + await self.log(embed, guild=guild, send_to=self.send_to.server) + + @commands.Cog.listener("on_guild_stickers_update") + async def logger_on_guild_stickers_update( + self, guild: discord.Guild, before: typing.Sequence[discord.Sticker], after: typing.Sequence[discord.Sticker] + ): + if guild.id not in self.bot.log_channels: + return + + added = [s for s in after if s not in before] + removed = [s for s in before if s not in after] + + for sticker in added: + if not self.bot.guild_loggings[guild.id].sticker_create: + break + + embed = discord.Embed( + title="Sticker Created", + colour=discord.Colour.green(), + timestamp=discord.utils.utcnow(), + description=f"[{sticker.name}]({sticker.url})", + ) + + if sticker.description: + embed.add_field(name="Description:", value=sticker.description, inline=False) + + embed.set_footer(text=f"Sticker ID: {sticker.id}") + await self.log(embed, guild=guild, send_to=self.send_to.server) + + for sticker in removed: + if not self.bot.guild_loggings[guild.id].sticker_delete: + break + + embed = discord.Embed( + title="Sticker Deleted", + colour=discord.Colour.red(), + timestamp=discord.utils.utcnow(), + description=f"{sticker.name}" + f"\n**Created at:** {discord.utils.format_dt(sticker.created_at)} ({discord.utils.format_dt(sticker.created_at, style='R')})", + ) + + if sticker.description: + embed.add_field(name="Description:", value=sticker.description, inline=False) + + embed.set_footer(text=f"Sticker ID: {sticker.id}") + await self.log(embed, guild=guild, send_to=self.send_to.server) + + existant = set.union(set(after) - set(added), set(before) - set(removed)) + for sticker in existant: + if not self.bot.guild_loggings[guild.id].sticker_update: + break + + before_sticker = discord.utils.get(before, id=sticker.id) + after_sticker = sticker + + if before_sticker and after_sticker: + await self.sticker_update(guild, before_sticker, after_sticker) + + async def sticker_update(self, guild: discord.Guild, before: discord.Sticker, after: discord.Sticker): + if before.description == after.description and before.name == after.name: + return + + if not self.bot.guild_loggings[guild.id].sticker_update: + return + + embed = discord.Embed( + title="Sticker Updated", + colour=discord.Colour.blurple(), + timestamp=discord.utils.utcnow(), + description=f"[{after.name}]({after.url})", + ) + + if before.name != after.name: + embed.add_field(name="Name updated:", inline=False, value=f"**Before:** {before.name}\n**After:** {after.name}") + + if before.description != after.description: + embed.add_field( + name="Name updated:", inline=False, value=f"**Before:** {before.description}\n**After:** {after.description}" + ) + + await self.log(embed, guild=guild, send_to=self.send_to.server) \ No newline at end of file diff --git a/extensions/logging/voice.py b/extensions/logging/voice.py new file mode 100644 index 0000000..dd05ec0 --- /dev/null +++ b/extensions/logging/voice.py @@ -0,0 +1,118 @@ +import discord +from discord.ext import commands + +from ._base import LoggingBase + + +class VoiceLogs(LoggingBase): + @commands.Cog.listener("on_voice_state_update") + async def logging_on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): + if member.guild.id not in self.bot.log_channels: + return + + if before.channel and after.channel and before.channel != after.channel and self.bot.guild_loggings[member.guild.id].voice_move: + embed = discord.Embed( + title="Member moved voice channels:", + colour=discord.Colour.blurple(), + timestamp=discord.utils.utcnow(), + description=f"**From:** {before.channel.mention} ({after.channel.id})" f"\n**To:** {after.channel.mention} ({after.channel.id})", + ) + + embed.set_author(name=str(member), icon_url=member.display_avatar.url) + embed.set_footer(text=f"Member ID: {member.id}") + await self.log(embed, guild=member.guild, send_to=self.send_to.voice) + + if not before.channel and after.channel and self.bot.guild_loggings[member.guild.id].voice_join: + embed = discord.Embed( + title="Member joined a voice channel:", + colour=discord.Colour.green(), + timestamp=discord.utils.utcnow(), + description=f"**Joined:** {after.channel.mention} ({after.channel.id})", + ) + + embed.set_author(name=str(member), icon_url=member.display_avatar.url) + embed.set_footer(text=f"Member ID: {member.id}") + await self.log(embed, guild=member.guild, send_to=self.send_to.voice) + + if before.channel and not after.channel and self.bot.guild_loggings[member.guild.id].voice_leave: + embed = discord.Embed( + title="Member left a voice channel:", + colour=discord.Colour.red(), + timestamp=discord.utils.utcnow(), + description=f"**Left:** {before.channel.mention} ({before.channel.id})", + ) + + embed.set_author(name=str(member), icon_url=member.display_avatar.url) + embed.set_footer(text=f"Member ID: {member.id}") + await self.log(embed, guild=member.guild, send_to=self.send_to.voice) + + if not self.bot.guild_loggings[member.guild.id].voice_mod: + return + + if before.deaf != after.deaf: + if after.deaf: + embed = discord.Embed( + title="Member Deafened by a Moderator", + colour=discord.Colour.dark_gold(), + timestamp=discord.utils.utcnow(), + ) + embed.set_author(name=str(member), icon_url=member.display_avatar.url) + embed.set_footer(text=f"Member ID: {member.id}") + await self.log(embed, guild=member.guild, send_to=self.send_to.voice) + + if before.deaf: + embed = discord.Embed( + title="Member Un-deafened by a Moderator", + colour=discord.Colour.yellow(), + timestamp=discord.utils.utcnow(), + ) + embed.set_author(name=str(member), icon_url=member.display_avatar.url) + embed.set_footer(text=f"Member ID: {member.id}") + await self.log(embed, guild=member.guild, send_to=self.send_to.voice) + + if before.mute != after.mute: + if after.mute: + embed = discord.Embed(title="Member Muted by a Moderator", colour=discord.Colour.dark_gold(), timestamp=discord.utils.utcnow()) + embed.set_author(name=str(member), icon_url=member.display_avatar.url) + embed.set_footer(text=f"Member ID: {member.id}") + await self.log(embed, guild=member.guild, send_to=self.send_to.voice) + + if before.mute: + embed = discord.Embed(title="Member Un-muted by a Moderator", colour=discord.Colour.yellow(), timestamp=discord.utils.utcnow()) + embed.set_author(name=str(member), icon_url=member.display_avatar.url) + embed.set_footer(text=f"Member ID: {member.id}") + await self.log(embed, guild=member.guild, send_to=self.send_to.voice) + + @commands.Cog.listener("on_stage_instance_create") + async def logging_on_stage_instance_create(self, stage_instance: discord.StageInstance): + if stage_instance.guild.id not in self.bot.log_channels or not self.bot.guild_loggings[stage_instance.guild.id].stage_open: + return + + embed = discord.Embed( + title="Stage opened", + colour=discord.Colour.teal(), + timestamp=discord.utils.utcnow(), + description=f"**Channel** <#{stage_instance.channel_id}> ({stage_instance.channel_id})\n" f"**Topic:** {stage_instance.topic}\n", + ) + + embed.set_footer(text=f"Channel ID: {stage_instance.channel_id}") + await self.log(embed, guild=stage_instance.guild, send_to=self.send_to.voice) + + @commands.Cog.listener("on_stage_instance_delete") + async def logging_on_stage_instance_delete(self, stage_instance: discord.StageInstance): + if stage_instance.guild.id not in self.bot.log_channels or not self.bot.guild_loggings[stage_instance.guild.id].stage_close: + return + + embed = discord.Embed( + title="Stage closed", + colour=discord.Colour.dark_teal(), + timestamp=discord.utils.utcnow(), + description=f"**Channel** <#{stage_instance.channel_id}> ({stage_instance.channel_id})\n" f"**Topic:** {stage_instance.topic}\n", + ) + + embed.set_footer(text=f"Channel ID: {stage_instance.channel_id}") + await self.log(embed, guild=stage_instance.guild, send_to=self.send_to.voice) + + @commands.Cog.listener("on_stage_instance_update") + async def logging_on_stage_instance_update(self, before: discord.StageInstance, after: discord.StageInstance): + pass diff --git a/utils/context.py b/utils/context.py new file mode 100644 index 0000000..04c54f2 --- /dev/null +++ b/utils/context.py @@ -0,0 +1,153 @@ +import typing +import random +import discord + +from discord import Interaction +from discord.ext import commands +from typing import TYPE_CHECKING + +from utils import tick +from discord import PartialEmoji, ButtonStyle, Interaction + +if TYPE_CHECKING: + from bot import Korii +else: + from discord.ext.commands import Bot as Korii + +target_type = typing.Union[discord.Member, discord.User, discord.PartialEmoji, discord.Guild, discord.Invite] + +class ConfirmButton(discord.ui.Button): + def __init__(self, label: str, emoji: str, button_style: discord.ButtonStyle): + super().__init__(style=button_style, label=label, emoji=emoji, ) + + async def callback(self, interaction: discord.Interaction): + assert self.view is not None + view: Confirm = self.view + view.value = True + view.stop() + + +class CancelButton(discord.ui.Button): + def __init__(self, label: str, emoji: str, button_style: discord.ButtonStyle): + super().__init__(style=button_style, label=label, emoji=emoji) + + async def callback(self, interaction: discord.Interaction): + assert self.view is not None + view: Confirm = self.view + view.value = False + view.stop() + + +class Confirm(discord.ui.View): + def __init__(self, buttons: typing.Tuple[typing.Tuple[typing.Optional[typing.Union[PartialEmoji, str]], str, ButtonStyle], typing.Tuple[typing.Optional[typing.Union[PartialEmoji, str]], str, ButtonStyle]], timeout: int = 30): + super().__init__(timeout=timeout) + self.message = None + self.value = None + self.ctx: CustomContext = None + self.add_item(ConfirmButton(emoji=buttons[0][0], + label=buttons[0][1], + button_style=( + buttons[0][2] or discord.ButtonStyle.green + ))) + self.add_item(CancelButton(emoji=buttons[1][0], + label=buttons[1][1], + button_style=( + buttons[1][2] or discord.ButtonStyle.red + ))) + + async def interaction_check(self, interaction: Interaction) -> bool: + if interaction.user and interaction.user.id in (self.ctx.bot.owner_id, self.ctx.author.id): + return True + messages = [ + "Oh no you can't do that! This belongs to **{user}**", + 'This is **{user}**\'s confirmation, sorry! πŸ’’', + 'πŸ˜’ Does this look yours? **No**. This is **{user}**\'s confirmation button', + f'STOP IT GET SOME HELP', + 'HEYYYY!!!!! this is **{user}**\'s menu.', + 'Sorry but you can\'t mess with **{user}**\' menu :(', + 'No. just no. This is **{user}**\'s menu.', + 'Stop.', + 'You don\'t look like {user} do you...', + '🀨 That\'s not yours! That\'s **{user}**\'s menu', + '🧐 Whomst! you\'re not **{user}**', + '_out!_ πŸ‘‹' + ] + await interaction.response.send_message(random.choice(messages).format(user=self.ctx.author.display_name), + ephemeral=True) + + return False + + +class CustomContext(commands.Context): + bot: Korii + + @staticmethod + def default_tick(opt: bool, text: str = None) -> str: + emoji = tick(opt) + if text: + return f"{emoji} {text}" + return emoji + + async def confirm(self, message: str = 'Do you want to confirm?', + buttons: typing.Optional[typing.Tuple[typing.Union[discord.PartialEmoji, str], + str, discord.ButtonStyle]] = None, timeout: int = 30, + delete_after_confirm: bool = False, + delete_after_timeout: bool = False, + delete_after_cancel: bool | None = None, + return_message: bool = False) \ + -> typing.Union[bool, typing.Tuple[bool, discord.Message]]: + """ A confirmation menu. """ + + delete_after_cancel = delete_after_cancel if delete_after_cancel is not None else delete_after_confirm + + view = Confirm(buttons=buttons or ( + (None, 'Confirm', discord.ButtonStyle.green), + (None, 'Cancel', discord.ButtonStyle.red) + ), timeout=timeout) + view.ctx = self + message = await self.send(message, view=view) + await view.wait() + if False in (delete_after_cancel, delete_after_confirm, delete_after_timeout): + view.children = [view.children[0]] + for c in view.children: + c.disabled = True + if view.value is False: + c.label = 'Cancelled!' + c.emoji = None + c.style = discord.ButtonStyle.red + elif view.value is True: + c.label = 'Confirmed!' + c.emoji = None + c.style = discord.ButtonStyle.green + else: + c.label = 'Timed out!' + c.emoji = '⏰' + c.style = discord.ButtonStyle.gray + view.stop() + if view.value is None: + + try: + if return_message is False: + (await message.edit(view=view)) if delete_after_timeout is False else (await message.delete()) + except (discord.Forbidden, discord.HTTPException): + pass + return (None, message) if delete_after_timeout is False and return_message is True else None + + elif view.value: + + try: + if return_message is False: + (await message.edit(view=view)) if delete_after_confirm is False else (await message.delete()) + except (discord.Forbidden, discord.HTTPException): + pass + return (True, message) if delete_after_confirm is False and return_message is True else True + + else: + + try: + if return_message is False: + (await message.edit(view=view)) if delete_after_cancel is False else (await message.delete()) + except (discord.Forbidden, discord.HTTPException): + pass + + return (False, message) if delete_after_cancel is False and return_message is True else False \ No newline at end of file diff --git a/utils/logging.py b/utils/logging.py new file mode 100644 index 0000000..7ce53f2 --- /dev/null +++ b/utils/logging.py @@ -0,0 +1,159 @@ +@fill_with_flags() +class LoggingEventsFlags(BaseFlags): + def __init__(self, permissions: int = 0, **kwargs: bool): + super().__init__(**kwargs) + if not isinstance(permissions, int): + raise TypeError(f"Expected int parameter, received {permissions.__class__.__name__} instead.") + self.value = permissions + for key, value in kwargs.items(): + if key not in self.VALID_FLAGS: + raise TypeError(f"{key!r} is not a valid permission name.") + setattr(self, key, value) + + @classmethod + def all(cls): + bits = max(cls.VALID_FLAGS.values()).bit_length() + value = (1 << bits) - 1 + self = cls.__new__(cls) + self.value = value + return self + + @classmethod + def message(cls): + return cls(0b000000000000000000000000000111) + + @classmethod + def join_leave(cls): + return cls(0b000000000000000000011000011000) + + @classmethod + def member(cls): + return cls(0b000000000000000000000111100000) + + @classmethod + def voice(cls): + return cls(0b000000110000000111100000000000) + + @classmethod + def server(cls): + return cls(0b111111111111111000000000000000) + + @flag_value + def message_delete(self): + return 1 << 0 + + @flag_value + def message_purge(self): + return 1 << 1 + + @flag_value + def message_edit(self): + return 1 << 2 + + @flag_value + def member_join(self): + return 1 << 3 + + @flag_value + def member_leave(self): + return 1 << 4 + + @flag_value + def member_update(self): + return 1 << 5 + + @flag_value + def user_ban(self): + return 1 << 6 + + @flag_value + def user_unban(self): + return 1 << 7 + + @flag_value + def user_update(self): + return 1 << 8 + + @flag_value + def invite_create(self): + return 1 << 9 + + @flag_value + def invite_delete(self): + return 1 << 10 + + @flag_value + def voice_join(self): + return 1 << 11 + + @flag_value + def voice_leave(self): + return 1 << 12 + + @flag_value + def voice_move(self): + return 1 << 13 + + @flag_value + def voice_mod(self): + return 1 << 14 + + @flag_value + def emoji_create(self): + return 1 << 15 + + @flag_value + def emoji_delete(self): + return 1 << 16 + + @flag_value + def emoji_update(self): + return 1 << 17 + + @flag_value + def sticker_create(self): + return 1 << 18 + + @flag_value + def sticker_delete(self): + return 1 << 19 + + @flag_value + def sticker_update(self): + return 1 << 20 + + @flag_value + def server_update(self): + return 1 << 21 + + @flag_value + def stage_open(self): + return 1 << 22 + + @flag_value + def stage_close(self): + return 1 << 23 + + @flag_value + def channel_create(self): + return 1 << 24 + + @flag_value + def channel_delete(self): + return 1 << 25 + + @flag_value + def channel_edit(self): + return 1 << 26 + + @flag_value + def role_create(self): + return 1 << 27 + + @flag_value + def role_delete(self): + return 1 << 28 + + @flag_value + def role_edit(self): + return 1 << 29 diff --git a/utils/utils.py b/utils/utils.py index c924fa3..2e5c4d4 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -9,6 +9,15 @@ from bot import Korii + +def tick(boolean: bool | None): + if boolean == True: + return "<:yes:1036761765322686505>" + elif boolean == False: + return "<:no:1036761731806003210>" + else: + return "<:icons_hyphen:1240731518175809556>" + async def shorten_text(bot: Korii, text: str, length: int | None = None, code: int | None = None, link: bool = False): """A function to shorten text shorten_text("Hello World", 6) --> Hello @@ -147,3 +156,17 @@ def get_member_permissions(permissions: discord.Permissions): perms.append("Create instant invites") return ", ".join(perms) if perms else 'No permissions' + +def col(color=None, /, *, fmt=0, bg=False): + base = "\u001b[" + if fmt != 0: + base += "{fmt};" + if color is None: + base += "{color}m" + color = 0 + else: + if bg is True: + base += "4{color}m" + else: + base += "3{color}m" + return base.format(fmt=fmt, color=color) \ No newline at end of file