From 31cbcd8a3143f29cc33ff7d74a7a08ffb11657e5 Mon Sep 17 00:00:00 2001 From: PiotrWieczorek Date: Sun, 22 Dec 2024 14:47:40 +0100 Subject: [PATCH] refactored code --- src/audio_player.py | 62 +++-- src/cogs/admin_cog.py | 42 ++- src/cogs/audio_cog.py | 181 ++++++------- src/discord_bot.py | 98 +++---- src/main.py | 76 +++--- src/utils/decorators.py | 108 ++++---- src/utils/endpoints.py | 124 +++++---- src/views/audio_player_view.py | 458 +++++++++++++++++++-------------- 8 files changed, 627 insertions(+), 522 deletions(-) diff --git a/src/audio_player.py b/src/audio_player.py index ff91e14..27875bf 100644 --- a/src/audio_player.py +++ b/src/audio_player.py @@ -6,7 +6,7 @@ class AudioPlayer(wavelink.Player): """ - Expands default wavelink player class functionality + Extends the functionality of the default Wavelink player class. """ def __init__( @@ -14,59 +14,87 @@ def __init__( client: discord.Client = MISSING, channel: Connectable = MISSING, *, - nodes: list[wavelink.Node] | None = None + nodes: list[wavelink.Node] | None = None, ): super().__init__(client=client, channel=channel, nodes=nodes) self._filters_applied = False @property - def filters_applied(self): + def filters_applied(self) -> bool: + """ + Indicates whether audio filters are currently applied. + """ return self._filters_applied - async def play_track(self, playable: wavelink.Search, start_time: int): + async def play_track(self, playable: wavelink.Search, start_time: int = 0) -> None: + """ + Plays a track, starting at a specific time. + + Args: + playable (wavelink.Search): The track to be played. + start_time (int): The time (in seconds) to start playback. Defaults to 0. + """ self.autoplay = wavelink.AutoPlayMode.partial await self.queue.put_wait(playable) if not self.playing: await self.play(self.queue.get(), start=start_time) - async def play_previous_track(self): + async def play_previous_track(self) -> None: + """ + Plays the previously played track from the queue history. + """ queue = self.queue history = self.queue.history - # If player is currently playing a track then last object in history is that track - # (because button is disabled for the very first track) - # otherwise last object in history is a previous track if self.playing: current_track = history[-1] previous_track = history[-2] - queue._queue.appendleft(current_track) - await history.delete(-1) + queue._queue.appendleft(current_track) # Moves the current track back to the queue + await history.delete(-1) # Removes the current track from history else: previous_track = history[-1] await self.play(previous_track, add_history=False) - async def play_track_from_queue(self, index: int): + async def play_track_from_queue(self, index: int) -> None: + """ + Plays a specific track from the queue by index. + + Args: + index (int): The index of the track in the queue. + """ track = self.queue[index] await self.queue.delete(index) await self.play(track) - async def play_track_from_history(self, index: int): + async def play_track_from_history(self, index: int) -> None: + """ + Plays a specific track from the queue history by index. + + Args: + index (int): The index of the track in the history. + """ history = self.queue.history track = history[index] await history.delete(index) await self.play(track) - async def disable_filters(self): + async def disable_filters(self) -> None: + """ + Disables all currently applied audio filters. + """ await self.set_filters() self._filters_applied = False - async def toggle_nightcore_filter(self): + async def toggle_nightcore_filter(self) -> None: + """ + Toggles the Nightcore audio filter on or off. + """ if self._filters_applied: - await self.set_filters() + await self.set_filters() # Resets filters to default else: - filters: wavelink.Filters = wavelink.Filters() - filters.timescale.set(pitch=1.2, speed=1.1, rate=1) + filters = wavelink.Filters() + filters.timescale.set(pitch=1.2, speed=1.1, rate=1.0) filters.equalizer.reset() await self.set_filters(filters) diff --git a/src/cogs/admin_cog.py b/src/cogs/admin_cog.py index fe39012..4676371 100644 --- a/src/cogs/admin_cog.py +++ b/src/cogs/admin_cog.py @@ -13,57 +13,73 @@ class AdminCog(commands.Cog): """ - Class used for administrative bot commands + Class used for administrative bot commands. """ def __init__(self, bot: DiscordBot) -> None: super().__init__() - self.bot: DiscordBot = bot + self.bot = bot - # Implement meta functions @self.bot.command() @commands.guild_only() async def sync( ctx: Context, guilds: Greedy[discord.Object], spec: Optional[Literal["~", "*", "^"]] = None ) -> None: """ - This command sync slash commands with discord + Syncs slash commands with Discord. + + Parameters: + guilds (Greedy[discord.Object]): Guilds to sync the commands to. + spec (Literal["~", "*", "^"], optional): Syncing option: + - "~": Sync only to the current guild. + - "*": Copy global commands to the current guild. + - "^": Clear commands from the current guild. + - None: Sync globally. """ bot = cast(DiscordBot, ctx.bot) + + # Handle syncing based on spec and guilds if not guilds: if spec == "~": synced = await bot.tree.sync(guild=ctx.guild) + location = "to the current guild" elif spec == "*": bot.tree.copy_global_to(guild=ctx.guild) synced = await bot.tree.sync(guild=ctx.guild) + location = "to the current guild (including global commands)" elif spec == "^": bot.tree.clear_commands(guild=ctx.guild) await bot.tree.sync(guild=ctx.guild) synced = [] + location = "after clearing commands from the current guild" else: synced = await bot.tree.sync() + location = "globally" - is_spec = 'globally' if spec is None else 'to the current guild.' - await ctx.send(f"Synced {len(synced)} commands {is_spec}") + await ctx.send(f"Synced {len(synced)} commands {location}.") return - ret = 0 + # Sync to specific guilds + successful_syncs = 0 for guild in guilds: try: await bot.tree.sync(guild=guild) except discord.HTTPException: - pass + continue else: - ret += 1 + successful_syncs += 1 - await ctx.send(f"Synced the tree to {ret}/{len(guilds)}.") + await ctx.send(f"Synced the tree to {successful_syncs}/{len(guilds)} guilds.") @commands.guild_only() @app_commands.command(name="restart") - async def restart_bot(self, interaction: discord.Interaction, member: discord.Member = None) -> None: + async def restart_bot(self, interaction: discord.Interaction) -> None: """ - This command exits the program which should automatically reboot the container + Restarts the bot by exiting the program, which should trigger a container reboot. + + Parameters: + interaction (discord.Interaction): The interaction triggering the command. """ await interaction.response.send_message(content="BRB") - logging.warning('Restart called from %d', interaction.guild.id) + logging.warning("Restart called from guild: %d", interaction.guild.id) sys.exit() diff --git a/src/cogs/audio_cog.py b/src/cogs/audio_cog.py index 280dda9..583da00 100644 --- a/src/cogs/audio_cog.py +++ b/src/cogs/audio_cog.py @@ -21,8 +21,8 @@ class AudioCog(commands.Cog): """ - Class for music commands. - self.views is dictionary that holds handle to a message with audio controls view {guild_id:view}, + Cog for handling music commands and soundboard functionality. + Manages a dictionary of audio control views for each guild. """ def __init__(self, bot: DiscordBot) -> None: @@ -41,31 +41,27 @@ def __del__(self): @user_is_in_voice_channel_check async def play(self, interaction: discord.Interaction, search: str) -> None: """ - To use soundboard type audio ID from list. To use YouTube type url or a search phrase. + Play audio from soundboard or YouTube. Supports search phrases or URLs. """ - await interaction.response.send_message(f"Looking for {search}...") + await interaction.response.send_message(f"Searching for: {search}...") guild_id = interaction.guild_id user_channel = interaction.user.voice.channel - # Connect to vc or change vc to the one caller is in player = cast(AudioPlayer, interaction.guild.voice_client) if not player or not player.connected: player = await user_channel.connect(cls=AudioPlayer, timeout=20) elif player.channel != user_channel: await player.move_to(user_channel) - view = self.views.get(guild_id, None) + view = self.views.get(guild_id) if not view: view = AudioPlayerView(self.bot, interaction.channel) self.views[guild_id] = view try: result, start_time = await self.__search_tracks(search, guild_id) - if isinstance(result, wavelink.Playlist): - await interaction.edit_original_response(content=f"Found \"{result.name}\".") - else: - await interaction.edit_original_response(content=f"Found \"{result.title}\".") - + response = f"Found: \"{result.name if isinstance(result, wavelink.Playlist) else result.title}\"." + await interaction.edit_original_response(content=response) await player.play_track(result, start_time) await view.send_embed() except Exception as err: @@ -78,182 +74,149 @@ async def play(self, interaction: discord.Interaction, search: str) -> None: @user_is_in_voice_channel_check async def skip(self, interaction: discord.Interaction) -> None: """ - To use soundboard type audio ID from list. To use YouTube type url or a search phrase. + Skip the current track. """ player = cast(AudioPlayer, interaction.guild.voice_client) await player.skip() if player.queue: - await interaction.response.send_message(f"Skipped track to \"{player.current.title}\".") + await interaction.response.send_message(f"Skipped to: \"{player.current.title}\".") else: - await interaction.response.send_message("Skipped track.") + await interaction.response.send_message("Track skipped.") @commands.cooldown(rate=1, per=1) @commands.guild_only() @app_commands.command(name="disconnect") async def disconnect(self, interaction: discord.Interaction) -> None: """ - Simple disconnect command. + Disconnect the bot from the voice channel. """ player = cast(AudioPlayer, interaction.guild.voice_client) - if not player.connected: - await interaction.response.send_message( - content='Bot is not connected to any voice channel', ephemeral=True, delete_after=3 - ) + if not player or not player.connected: + await interaction.response.send_message("Not connected to any voice channel.", ephemeral=True, delete_after=3) return - await self.__remove_view_and_disconnect_player(player) - await interaction.response.send_message(content='Bot disconnected', ephemeral=True, delete_after=3) + await self.__remove_view_and_disconnect_player(player) + await interaction.response.send_message("Bot disconnected.", ephemeral=True, delete_after=3) @commands.guild_only() @app_commands.command(name="soundboard") async def list_soundboard(self, interaction: discord.Interaction): """ - Lists all audio files uploaded to soundboard + List all audio files uploaded to the soundboard. """ - await interaction.response.send_message("Preparing list...") - guild_soundboard = Endpoints.get_soundboard(interaction.guild_id) - if not guild_soundboard: - await interaction.edit_original_response(content='No files uploaded!') + await interaction.response.send_message("Preparing soundboard list...") + soundboard = Endpoints.get_soundboard(interaction.guild_id) + if not soundboard: + await interaction.edit_original_response(content="No files uploaded!") return - message_content = "SOUNDBOARD\n" - i = 0 - for entry in guild_soundboard: - i += 1 - message_content += f"{i}. {entry.replace('_', ' - ', 1).replace('_', ' ').capitalize().split('.mp3')[0]}\n" + content = "SOUNDBOARD\n" + "\n".join( + f"{i + 1}. {entry.replace('_', ' - ', 1).replace('_', ' ').capitalize().split('.mp3')[0]}" + for i, entry in enumerate(soundboard) + ) - file = discord.File(fp=BytesIO(message_content.encode("utf8")), filename="soundboard.cpp") - await interaction.edit_original_response(content='', attachments=[file]) + file = discord.File(BytesIO(content.encode("utf-8")), filename="soundboard.txt") + await interaction.edit_original_response(content="", attachments=[file]) @commands.guild_only() @app_commands.command(name="volume") async def set_volume(self, interaction: discord.Interaction, value: int): """ - Set volume. Range: 0-100 + Set the bot's playback volume (0-100). """ - if value < 0 or value > 100: - await interaction.response.send_message("Value must be between 0 and 100", ephemeral=True, delete_after=3) + if not (0 <= value <= 100): + await interaction.response.send_message("Volume must be between 0 and 100.", ephemeral=True, delete_after=3) return player = cast(AudioPlayer, interaction.guild.voice_client) if player.connected: await player.set_volume(value) - await interaction.response.send_message(f"Value set to {value}", delete_after=15) + await interaction.response.send_message(f"Volume set to {value}.", delete_after=15) else: - await interaction.response.send_message("Bot must be in voice channel", ephemeral=True, delete_after=3) + await interaction.response.send_message("Bot must be in a voice channel.", ephemeral=True, delete_after=3) @commands.cooldown(rate=1, per=1) @commands.guild_only() @app_commands.command(name="upload") async def upload_audio(self, interaction: discord.Interaction, mp3_file: discord.Attachment): """ - Upload audio file to soundboard + Upload an audio file to the soundboard. """ await interaction.response.send_message("Processing file...") if not mp3_file.filename.endswith(".mp3"): - await interaction.edit_original_response(content='Audio files must have .mp3 format') + await interaction.edit_original_response(content="Only .mp3 files are allowed.") return - mp3_file_bytes = await mp3_file.read() - result = Endpoints.upload_audio(interaction.guild_id, mp3_file.filename, mp3_file_bytes) + file_bytes = await mp3_file.read() + result = Endpoints.upload_audio(interaction.guild_id, mp3_file.filename, file_bytes) await interaction.edit_original_response(content=result) - async def __search_tracks( - self, search_phrase: str, guild_id: str - ) -> tuple[wavelink.Playable | wavelink.Playlist, int]: + async def __search_tracks(self, search: str, guild_id: int) -> tuple[wavelink.Playable | wavelink.Playlist, int]: """ - Decides which type of track should be used based on search phrase - Args: - search_phrase (str): text input from discord command user - - Returns: - tuple[list[wavelink.Playable], int]: tuple with list of tracks in case of playlist with start_time = 0,\n - list with single track and start time otherwise. - + Search for tracks on YouTube or the soundboard. """ - start_time: int = 0 - tracks = None - youtube_playlist_regex = re.search(r"list=([^#\&\?]*).*", search_phrase) - - # Check if it's a YouTube playlist... - if youtube_playlist_regex and youtube_playlist_regex.groups(): - # Build safe url to avoid errors - safe_url = f'https://www.youtube.com/playlist?list={youtube_playlist_regex.groups()[0]}' - search_result = await wavelink.Playable.search(safe_url) - if isinstance(search_result, wavelink.Playlist): - tracks = search_result - return tracks, start_time + start_time = 0 + youtube_playlist_regex = re.search(r"list=([^#&?]*)", search) + + if youtube_playlist_regex: + playlist_id = youtube_playlist_regex.group(1) + result = await wavelink.Playable.search(f"https://www.youtube.com/playlist?list={playlist_id}") + if isinstance(result, wavelink.Playlist): + return result, start_time raise UnexpectedPlayableType - # ...or track in the soundboard... - if search_phrase.isdecimal(): - sound_id = int(search_phrase) - guild_soundboard = Endpoints.get_soundboard(guild_id) - if guild_soundboard and sound_id <= len(guild_soundboard): - file_name = guild_soundboard[sound_id - 1] - file_path = f'sounds/{guild_id}/{file_name}' - search_result = await wavelink.Pool.fetch_tracks(file_path) - tracks = search_result[0] - return tracks, start_time + if search.isdigit(): + soundboard = Endpoints.get_soundboard(guild_id) + if soundboard and int(search) <= len(soundboard): + file_name = soundboard[int(search) - 1] + result = await wavelink.Pool.fetch_tracks(f"sounds/{guild_id}/{file_name}") + return result[0], start_time raise SoundboardTrackNotFound - # ...otherwise search given phrase on youtube - # Check if start time was passed - start_time_regex = re.search(r"(?:[\?&])?t=([0-9]+)", search_phrase) - if start_time_regex and start_time_regex.groups()[0]: - start_time = int(start_time_regex.groups()[0]) * 1000 - - # Build safe url to avoid errors - video_id_regex = re.search( - r"youtu(?:be\.com\/watch\?[^\s]*v=|\.be\/)([\w\-\_]*)(&(amp;)?[\w\?=]*)?", search_phrase - ) - if video_id_regex and video_id_regex.groups()[0]: - safe_url = f'https://www.youtube.com/watch?v={video_id_regex.groups()[0]}' - search_result = await wavelink.Playable.search(safe_url) + video_id_regex = re.search(r"(?:youtu\.be/|youtube\.com/watch\?v=)([\w-]+)", search) + if video_id_regex: + video_id = video_id_regex.group(1) + result = await wavelink.Playable.search(f"https://www.youtube.com/watch?v={video_id}") else: - search_result = await wavelink.Playable.search(search_phrase) - - tracks = search_result[0] if search_result else [] + result = await wavelink.Playable.search(search) - if not tracks: + if not result: raise YoutubeTrackNotFound - return tracks, start_time + start_time_regex = re.search(r"(?:[?&])t=(\d+)", search) + if start_time_regex: + start_time = int(start_time_regex.group(1)) * 1000 + + return result[0], start_time - async def disconnect_player_if_alone_in_channel(self, player: discord.VoiceProtocol, delay: int = 2): + async def disconnect_player_if_alone_in_channel(self, player: AudioPlayer, delay: int = 2): """ - After delay checks if player is alone in voice channel. - If so, removes guild's player view and disconnects + Disconnect the player if it's alone in the voice channel after a delay. """ await asyncio.sleep(delay) - player: AudioPlayer = cast(AudioPlayer, player) - if player.channel and player.connected and len(player.channel.members) == 1: + if player.channel and len(player.channel.members) == 1: await self.__remove_view_and_disconnect_player(player) async def __remove_view_and_disconnect_player(self, player: AudioPlayer): """ - Removes view and disconnect player + Remove the view associated with the guild and disconnect the player. """ guild_id = player.guild.id + if guild_id in self.views: + self.views[guild_id].remove_view() + del self.views[guild_id] await player.disconnect() - if view := self.views.get(guild_id): - view.remove_view() - self.views.pop(guild_id) @commands.Cog.listener() - async def on_wavelink_track_end(self, payload: wavelink.TrackEndEventPayload) -> None: + async def on_wavelink_track_end(self, payload: wavelink.TrackEndEventPayload): """ - Callback used when player finished playing a track - Used only to update embed when all tracks finished playing since we'd use - on_wavelink_track_start otherwise + Triggered when a track finishes playing. """ player = cast(AudioPlayer, payload.player) view = self.views.get(player.guild.id) - # Due to relying on validation from Lavalink player.playing property may in some cases - # return True directly after skipping/stopping a track, so we wait a bit await asyncio.sleep(0.1) - if len(player.queue) == 0 and not player.playing: + if not player.queue and not player.playing: await player.disable_filters() await view.send_embed() diff --git a/src/discord_bot.py b/src/discord_bot.py index 6f9d27f..aaa4a0b 100644 --- a/src/discord_bot.py +++ b/src/discord_bot.py @@ -1,47 +1,51 @@ -import logging -import os -from types import TracebackType -from typing import Optional, Type - -import wavelink -from discord import Intents -from discord.ext import commands - - -class DiscordBot(commands.Bot): - """ - Expands default bot class functionality - """ - - def __init__(self): - intents = Intents.default() - intents.message_content = True - super().__init__( - command_prefix=commands.when_mentioned_or("/"), - description='The Boi is back', - intents=intents, - ) - - async def __aexit__( - self, - exc_type: Optional[Type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], - ) -> None: - # run cogs destructors if defined - for cog in self.cogs.values(): - if hasattr(cog, '__del__'): - cog.__del__() # since just using del doesnt guarantee that destructor runs in async resource - return await super().__aexit__(exc_type, exc_value, traceback) - - async def setup_hook(self) -> None: - """ - Connect to lavalink server - """ - node_url = f"{os.getenv('WAVELINK_URL')}:{os.getenv('WAVELINK_PORT')}" - node: wavelink.Node = wavelink.Node(uri=node_url, password=os.getenv('WAVELINK_PASSWORD')) - try: - await wavelink.Pool.connect(client=self, nodes=[node]) - except wavelink.exceptions.WavelinkException as err: - logging.warning("Could not connect to lavalink!") - logging.warning(err) +import logging +import os +from types import TracebackType +from typing import Optional, Type + +import wavelink +from discord import Intents +from discord.ext import commands + + +class DiscordBot(commands.Bot): + """ + Extends the default bot class functionality. + """ + + def __init__(self) -> None: + intents = Intents.default() + intents.message_content = True # Enables the bot to access message content. + super().__init__( + command_prefix=commands.when_mentioned_or("/"), + description="The Boi is back", + intents=intents, + ) + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + """ + Clean up resources and run destructors for cogs, if defined. + """ + for cog in self.cogs.values(): + if hasattr(cog, "__del__"): + cog.__del__() # Call explicitly to ensure cleanup. + await super().__aexit__(exc_type, exc_value, traceback) + + async def setup_hook(self) -> None: + """ + Connect to the Lavalink server during bot setup. + """ + node_url = f"{os.getenv('WAVELINK_URL')}:{os.getenv('WAVELINK_PORT')}" + node = wavelink.Node(uri=node_url, password=os.getenv('WAVELINK_PASSWORD')) + + try: + await wavelink.Pool.connect(client=self, nodes=[node]) + logging.info("Connected to Lavalink server successfully.") + except wavelink.exceptions.WavelinkException as err: + logging.warning("Could not connect to the Lavalink server.") + logging.warning(err) diff --git a/src/main.py b/src/main.py index 797ddc1..4b1cbba 100644 --- a/src/main.py +++ b/src/main.py @@ -14,24 +14,26 @@ from cogs.badura_cog import BaduraCog from discord_bot import DiscordBot -# Set up logger +# Logger setup logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S", handlers=[logging.FileHandler("info.log"), logging.StreamHandler()], ) -# Load env variables from .env file + +# Load environment variables and dependencies load_dotenv() -# Load static ffmpeg library static_ffmpeg.add_paths() -# Load opus library - depends on OS + +# Load Opus library based on OS if os.name == 'nt': discord.opus._load_default() elif os.name == 'posix': discord.opus.load_opus('libopus.so.0') + if not discord.opus.is_loaded(): - raise RuntimeError('Opus failed to load!') + raise RuntimeError('Failed to load Opus library.') class Bot: @@ -41,77 +43,77 @@ class Bot: def __init__(self) -> None: self.bot = DiscordBot() - self.create_cogs() - self.setup_events() + self._initialize_cogs() + self._initialize_events() - def create_cogs(self): + def _initialize_cogs(self): """ - Create Cogs + Initialize and configure all bot cogs. """ - self.bot_admin = AdminCog(self.bot) - self.audio = AudioCog(self.bot) - self.users_related = UserCog(self.bot) - self.badura = BaduraCog(self.bot) + self.admin_cog = AdminCog(self.bot) + self.audio_cog = AudioCog(self.bot) + self.user_cog = UserCog(self.bot) + self.badura_cog = BaduraCog(self.bot) - def setup_events(self): + def _initialize_events(self): """ - Setup events + Configure bot events. """ @self.bot.event async def on_ready(): """ - Event that occurrence one time when bot is ready to work + Event triggered when the bot is ready. """ - logging.info('Logged in as %s (ID: %d)\n-----------\n', self.bot.user, self.bot.user.id) + logging.info("Logged in as %s (ID: %d)", self.bot.user, self.bot.user.id) + logging.info("Bot is ready and operational.") @self.bot.event async def on_voice_state_update(member: discord.Member, before: discord.VoiceState, after: discord.VoiceState): """ - Event that occurrence whenever someone leaves/joins vc + Event triggered when a user's voice state changes. """ player = member.guild.voice_client if player: - await self.audio.disconnect_player_if_alone_in_channel(player, 10) + await self.audio_cog.disconnect_player_if_alone_in_channel(player, 10) async def run(self): """ - Main boot start function + Main function to start the bot. """ discord.utils.setup_logging(level=logging.WARNING, root=False) - # Load cogs + async with self.bot: - await self.bot.add_cog(self.bot_admin) - await self.bot.add_cog(self.users_related) - await self.bot.add_cog(self.audio) - await self.bot.add_cog(self.badura) - await self.bot.start(os.getenv('BOT_TOKEN')) + await self.bot.add_cog(self.admin_cog) + await self.bot.add_cog(self.user_cog) + await self.bot.add_cog(self.audio_cog) + await self.bot.add_cog(self.badura_cog) + await self.bot.start(os.getenv("BOT_TOKEN")) def sigterm_handler(signum, frame): """ - Handler for SIGTERM signal + Handler for SIGTERM signal. """ - logging.info('Recieved SIGTERM signal.\nExiting...') - sys.exit(1) + logging.info("Received SIGTERM signal. Shutting down gracefully.") + sys.exit(0) def main(): """ - Main function + Entry point of the script. """ - logging.info('Info log...') - logging.warning('Warning log..') - logging.error('error log..') - + logging.info("Starting bot...") + + # Instantiate and run the bot bot = Bot() try: asyncio.run(bot.run()) except KeyboardInterrupt: - logging.info('Recieved interrupt signal.\nExiting...') - sys.exit(1) + logging.info("Received interrupt signal. Exiting...") + sys.exit(0) -if __name__ == '__main__': +if __name__ == "__main__": signal.signal(signal.SIGTERM, sigterm_handler) main() diff --git a/src/utils/decorators.py b/src/utils/decorators.py index 3a21813..7942e95 100644 --- a/src/utils/decorators.py +++ b/src/utils/decorators.py @@ -9,20 +9,22 @@ def bot_is_in_voice_channel_check(func): """ - Decorator used to check whether bot is in voice channel. - Executes decorated function only if check passed + Decorator to check whether the bot is in a voice channel. + Executes the decorated function only if the check passes. """ - - @functools.wraps(func) # preserves signature of the function so it can be used in commands + @functools.wraps(func) async def decorator(*args, **kwargs): interaction = next((arg for arg in args if isinstance(arg, discord.Interaction)), None) if interaction is None: - raise ValueError("Interaction is None") + raise ValueError("No discord.Interaction found in arguments.") player = cast(AudioPlayer, interaction.guild.voice_client) if not player: - await interaction.response.send_message("Bot not in voice channel", delete_after=3, ephemeral=True) + await interaction.response.send_message( + "The bot is not in a voice channel.", delete_after=3, ephemeral=True + ) return + await func(*args, **kwargs) return decorator @@ -30,22 +32,21 @@ async def decorator(*args, **kwargs): def user_is_in_voice_channel_check(func): """ - Decorator used to check whether player is in voice channel. - Executes decorated function only if check passed + Decorator to check whether the user is in a voice channel. + Executes the decorated function only if the check passes. """ - - @functools.wraps(func) # preserves signature of the function so it can be used in commands + @functools.wraps(func) async def decorator(*args, **kwargs): interaction = next((arg for arg in args if isinstance(arg, discord.Interaction)), None) if interaction is None: - raise ValueError("Interaction is None") + raise ValueError("No discord.Interaction found in arguments.") - voice_channel = interaction.user.voice - if not voice_channel: + if not interaction.user.voice: await interaction.response.send_message( - "You can't control the bot because you're not in a voice channel", delete_after=3, ephemeral=True + "You must be in a voice channel to control the bot.", delete_after=3, ephemeral=True ) return + await func(*args, **kwargs) return decorator @@ -53,22 +54,24 @@ async def decorator(*args, **kwargs): def user_bot_in_same_channel_check(func): """ - Decorator used to check whether player is able to use audio player controls. - (Must be in the same voice channel) - Executes decorated function only if check passed + Decorator to check whether the user is in the same voice channel as the bot. + Executes the decorated function only if the check passes. """ - - @functools.wraps(func) # preserves signature of the function so it can be used in commands + @functools.wraps(func) async def decorator(*args, **kwargs): interaction = next((arg for arg in args if isinstance(arg, discord.Interaction)), None) if interaction is None: - raise ValueError("Interaction is None") + raise ValueError("No discord.Interaction found in arguments.") - player, voice_channel = cast(AudioPlayer, interaction.guild.voice_client), interaction.user.voice - if not (player and voice_channel and player.channel.id == voice_channel.channel.id): - msg = "You can't control the bot because you're not on the same voice channel" - await interaction.response.send_message(msg, delete_after=3, ephemeral=True) + player = cast(AudioPlayer, interaction.guild.voice_client) + user_voice = interaction.user.voice + + if not (player and user_voice and player.channel.id == user_voice.channel.id): + await interaction.response.send_message( + "You must be in the same voice channel as the bot to control it.", delete_after=3, ephemeral=True + ) return + await func(*args, **kwargs) return decorator @@ -76,20 +79,22 @@ async def decorator(*args, **kwargs): def is_playing_check(func): """ - Decorator for checking if bot is playing something. - Executes decorated function only if check passed + Decorator to check whether the bot is currently playing audio. + Executes the decorated function only if the check passes. """ - - @functools.wraps(func) # preserves signature of the function so it can be used in commands + @functools.wraps(func) async def decorator(*args, **kwargs): interaction = next((arg for arg in args if isinstance(arg, discord.Interaction)), None) if interaction is None: - raise ValueError("Interaction is None") + raise ValueError("No discord.Interaction found in arguments.") player = cast(AudioPlayer, interaction.guild.voice_client) - if not player.playing: - await interaction.response.send_message("Nothing is playing right now", delete_after=3, ephemeral=True) + if not player or not player.playing: + await interaction.response.send_message( + "Nothing is playing right now.", delete_after=3, ephemeral=True + ) return + await func(*args, **kwargs) return decorator @@ -97,25 +102,26 @@ async def decorator(*args, **kwargs): def button_cooldown(func): """ - Decorator for setting cooldown for buttons.\n - Executes decorated method if button is off cooldown.\n - The object of the decorated method has to have a property named _cooldown - which determines the cooldown of the buttons. + Decorator to enforce cooldowns for buttons. + The object of the decorated method must have a `_cooldown` property. """ - - @functools.wraps(func) # preserves signature of the function so it can be used in commands + @functools.wraps(func) async def decorator(self, *args, **kwargs): interaction = next((arg for arg in args if isinstance(arg, discord.Interaction)), None) if interaction is None: - raise ValueError("Interaction is None") - if not hasattr(self, '_cooldown'): - raise ValueError("Object has no attribute named _cooldown") - interaction.message.channel = interaction.channel + raise ValueError("No discord.Interaction found in arguments.") + + if not hasattr(self, "_cooldown"): + raise ValueError("The object does not have a `_cooldown` property.") + bucket = self._cooldown.get_bucket(interaction.message) retry = bucket.update_rate_limit() if retry: - await interaction.response.send_message("🤠 Slow down partner! 🤠", delete_after=3, ephemeral=True) + await interaction.response.send_message( + "🤠 Slow down, partner! 🤠", delete_after=3, ephemeral=True + ) return + await func(self, *args, **kwargs) return decorator @@ -123,18 +129,20 @@ async def decorator(self, *args, **kwargs): def run_threadsafe(func): """ - Decorator for runing coroutine as threadsafe.\n - Decorated corouting becomes a function so it doesnt have to be awaited + Decorator to run a coroutine in a thread-safe manner. + The decorated function will be run as a thread-safe coroutine. """ - def decorator(*args, **kwargs): - loop = next((arg for arg in args if isinstance(arg, asyncio.AbstractEventLoop)), None) + loop = next( + (arg for arg in args if isinstance(arg, asyncio.AbstractEventLoop)), + next((kwarg for kwarg in kwargs.values() if isinstance(kwarg, asyncio.AbstractEventLoop)), None) + ) if not loop: - loop = next((kwarg for kwarg in kwargs.values() if isinstance(kwarg, asyncio.AbstractEventLoop)), None) - if loop is None: - raise ValueError("There is no EventLoop in argument of decorated function") + raise ValueError("No asyncio.AbstractEventLoop found in arguments.") + if not inspect.iscoroutinefunction(func): - raise TypeError('Decorated function is not a coroutine') + raise TypeError("The decorated function must be a coroutine.") + coro = func(*args, **kwargs) asyncio.run_coroutine_threadsafe(coro, loop=loop) diff --git a/src/utils/endpoints.py b/src/utils/endpoints.py index da879b8..97ddc39 100644 --- a/src/utils/endpoints.py +++ b/src/utils/endpoints.py @@ -1,51 +1,73 @@ -from __future__ import annotations - -import base64 -import logging -import os - -import dotenv -import requests - -dotenv.load_dotenv() -server = f"http://{os.getenv('SERVER_IP')}:{os.getenv('SERVER_PORT')}" - - -class Endpoints: - """ - HTTP communication with audio server - """ - - @staticmethod - def get_soundboard(guild_id) -> list[str] | None: - """ - Returns list of sounds located on the server for giver id - """ - url = server + f"/{os.getenv('SERVER_ENDPOINT')}/{guild_id}" - response = requests.get(url=url, timeout=2) - if response.status_code != 200: - logging.warning("Server responded with code: %d", response.status_code) - return None - return response.json()["files"] - - @staticmethod - def upload_audio(guild_id: int, file_name: str, file_data: bytes) -> str: - """ - Upload bytes data to server - """ - b64_code = base64.b64encode(file_data) - headers = {'Content-Type': 'application/json'} - mp3_json = {"file_name": file_name, "file_data": b64_code.decode('utf-8')} - url = server + f"/{os.getenv('SERVER_ENDPOINT')}/{guild_id}" - message = '' - try: - response = requests.post(url=url, headers=headers, json=mp3_json, timeout=2) - if response.status_code == 200: - message = 'Upload successful!' - else: - message = f'Upload failed, server code:{response.status_code}' - except requests.exceptions.ConnectTimeout: - message = 'Upload failed, request timed out.' - except requests.exceptions.ReadTimeout: - message = 'Upload failed, request timed out.' - return message +from __future__ import annotations + +import base64 +import logging +import os +from typing import Optional + +import dotenv +import requests + +# Load environment variables +dotenv.load_dotenv() + +SERVER_URL = f"http://{os.getenv('SERVER_IP')}:{os.getenv('SERVER_PORT')}" +ENDPOINT = os.getenv('SERVER_ENDPOINT') + + +class Endpoints: + """ + Handles HTTP communication with the audio server. + """ + + @staticmethod + def get_soundboard(guild_id: int) -> Optional[list[str]]: + """ + Retrieves a list of sound files from the server for the given guild ID. + + Args: + guild_id (int): The ID of the guild. + + Returns: + Optional[list[str]]: A list of sound file names or None if the request fails. + """ + url = f"{SERVER_URL}/{ENDPOINT}/{guild_id}" + try: + response = requests.get(url=url, timeout=2) + if response.status_code == 200: + return response.json().get("files", []) + logging.warning("Server responded with status code: %d", response.status_code) + except requests.RequestException as e: + logging.error("Error fetching soundboard: %s", e) + return None + + @staticmethod + def upload_audio(guild_id: int, file_name: str, file_data: bytes) -> str: + """ + Uploads audio data to the server. + + Args: + guild_id (int): The ID of the guild. + file_name (str): The name of the file to upload. + file_data (bytes): The file data in bytes. + + Returns: + str: A message indicating the result of the upload operation. + """ + url = f"{SERVER_URL}/{ENDPOINT}/{guild_id}" + b64_code = base64.b64encode(file_data).decode('utf-8') + headers = {'Content-Type': 'application/json'} + payload = {"file_name": file_name, "file_data": b64_code} + + try: + response = requests.post(url=url, headers=headers, json=payload, timeout=2) + if response.status_code == 200: + return "Upload successful!" + return f"Upload failed, server responded with status code: {response.status_code}" + except requests.ConnectTimeout: + return "Upload failed, connection timed out." + except requests.ReadTimeout: + return "Upload failed, server took too long to respond." + except requests.RequestException as e: + logging.error("Error during file upload: %s", e) + return "Upload failed due to an unexpected error." diff --git a/src/views/audio_player_view.py b/src/views/audio_player_view.py index 921cf08..33f7c2c 100644 --- a/src/views/audio_player_view.py +++ b/src/views/audio_player_view.py @@ -2,12 +2,12 @@ import asyncio import datetime +from dataclasses import dataclass from typing import TYPE_CHECKING, cast import discord from discord.ext import commands from audio_player import AudioPlayer - from utils.decorators import ( button_cooldown, is_playing_check, @@ -17,274 +17,336 @@ if TYPE_CHECKING: from main import DiscordBot - -NOTHING_IN_QUEUE_PLACEHOLDER = 'Nothing in current queue.' -MAX_SELECT_LEN = 25 - +@dataclass +class QueueDisplay: + """Configuration for queue display""" + MAX_ITEMS = 25 + PLACEHOLDER = 'Nothing in current queue.' + MAX_PREVIEW_ITEMS = 10 class AudioPlayerView(discord.ui.View): - """ - View class for controlling audio player through view - """ + """View class for controlling audio player through Discord UI""" def __init__(self, bot: DiscordBot, text_channel: discord.TextChannel): super().__init__(timeout=None) - self.bot: DiscordBot = bot - self.text_channel: discord.TextChannel = text_channel + self.bot = bot + self.text_channel = text_channel self.message_handle: discord.Message | None = None - self.queue_page: int = 0 - self._cooldown = commands.CooldownMapping.from_cooldown(rate=1, per=1, type=commands.BucketType.channel) + self.queue_page = 0 + self._cooldown = commands.CooldownMapping.from_cooldown( + rate=1, + per=1, + type=commands.BucketType.channel + ) + self._setup_buttons() + self._setup_queue_select() - def __del__(self): - try: - coro = self.message_handle.delete() - asyncio.run_coroutine_threadsafe(coro, self.bot.loop) - except AttributeError: - pass + def _setup_buttons(self): + """Initialize button layouts and styles""" + # Row 0: Playback controls + self.previous_button = self._create_button( + '◀◀ Prev', discord.ButtonStyle.blurple, self.previous_callback, row=0 + ) + self.pause_button = self._create_button( + '❚❚ Pause', discord.ButtonStyle.blurple, self.pause_callback, row=0 + ) + self.skip_button = self._create_button( + 'â–ļâ–ļ Skip', discord.ButtonStyle.blurple, self.skip_callback, row=0 + ) + self.stop_button = self._create_button( + '▮ Stop', discord.ButtonStyle.red, self.stop_callback, row=0 + ) + self.filter_button = self._create_button( + 'āļž', discord.ButtonStyle.grey, self.filter_callback, row=0 + ) - def remove_view(self): - """ - Removes embed with audio player information - """ + # Row 2: Navigation controls + self.previous_page_button = self._create_button( + '◀', discord.ButtonStyle.grey, self.previous_page_callback, row=2, disabled=True + ) + self.next_page_button = self._create_button( + 'â–ļ', discord.ButtonStyle.grey, self.next_page_callback, row=2, disabled=True + ) + + def _setup_queue_select(self): + """Initialize queue selection dropdown""" + self.queue_select = discord.ui.Select( + options=[discord.SelectOption(label=QueueDisplay.PLACEHOLDER)], + placeholder=QueueDisplay.PLACEHOLDER, + max_values=1, + min_values=1, + disabled=True, + row=1 + ) + self.queue_select.callback = self.queue_select_callback + self.add_item(self.queue_select) + + def _create_button(self, label: str, style: discord.ButtonStyle, callback, **kwargs) -> discord.ui.Button: + """Create and configure a button with the given parameters""" + button = discord.ui.Button(label=label, style=style, **kwargs) + button.callback = callback + self.add_item(button) + return button + + async def cleanup(self): + """Clean up resources and remove the view""" + if self.message_handle: + try: + await self.message_handle.delete() + except discord.NotFound: + pass + self.stop() + self.clear_items() + + def __del__(self): if self.message_handle: coro = self.message_handle.delete() - self.stop() - self.clear_items() asyncio.run_coroutine_threadsafe(coro, self.bot.loop) async def send_embed(self): - """ - Removes last message and sends new one to keep it on the bottom of the chat\n - """ - embed = await self.__prepare_embed() - self._update_buttons_state() + """Update the embed message with current player state""" + embed = await self._create_embed() + self._update_ui_state() + if self.message_handle: await self.message_handle.delete() - self.message_handle = await self.text_channel.send(content=None, embed=embed, view=self) + self.message_handle = await self.text_channel.send(embed=embed, view=self) + + def _format_duration(self, milliseconds: float) -> str: + """Format milliseconds duration into human-readable string""" + total_seconds = milliseconds / 1000 + minutes, seconds = divmod(total_seconds, 60) + hours, minutes = divmod(minutes, 60) + + if hours > 0: + return f'⌛ {int(hours):02d} hr {int(minutes):02d} min' + return f'⌛ {int(minutes):02d} min {int(seconds):02d} s' + + def _format_queue_preview(self, player: AudioPlayer) -> tuple[str, str]: + """Format queue preview and duration""" + if not player.queue: + return '', '' + + total_duration = sum(track.length for track in player.queue) + queue_duration = self._format_duration(total_duration) + + if len(player.queue) > QueueDisplay.MAX_PREVIEW_ITEMS: + preview_tracks = [ + f"{i + 1}. {track.title}" + for i, track in enumerate(player.queue[:QueueDisplay.MAX_PREVIEW_ITEMS]) + ] + remaining = len(player.queue) - QueueDisplay.MAX_PREVIEW_ITEMS + preview_tracks.append(f'... and {remaining} more.') + else: + preview_tracks = [ + f"{i + 1}. {track.title}" + for i, track in enumerate(player.queue) + ] + + return '\n'.join(preview_tracks + [f'{queue_duration}']), queue_duration + + async def _create_embed(self) -> discord.Embed: + """Create embed with current player information""" + player = cast(AudioPlayer, self.text_channel.guild.voice_client) + embed = discord.Embed( + title='The Boi', + color=0x00FF00, + timestamp=datetime.datetime.now(datetime.timezone.utc) + ) + + queue_preview, _ = self._format_queue_preview(player) + if queue_preview: + embed.add_field(name='Queue:', value=queue_preview, inline=False) + + if player.current: + duration = self._format_duration(player.current.length) + embed.add_field( + name='Now Playing:', + value=f'{player.current.title}\n{duration}', + inline=True + ) + if player.current.artwork: + embed.set_thumbnail(url=player.current.artwork) + else: + embed.add_field( + name='Nothing is playing right now', + value=':(', + inline=True + ) + + embed.add_field(name='', value='▁' * 15, inline=False) + embed.set_footer( + text='2137', + icon_url='https://media.tenor.com/mc3OyxhLazUAAAAM/doggo-doge.gif' + ) + return embed + + def _update_ui_state(self): + """Update all UI elements based on current player state""" + player = cast(AudioPlayer, self.text_channel.guild.voice_client) + + # Update button states + self._update_playback_buttons(player) + self._update_navigation_buttons(player) + self._update_queue_select(player) + + def _update_playback_buttons(self, player: AudioPlayer): + """Update state of playback control buttons""" + self.previous_button.disabled = not len(player.queue.history) > 1 + self.pause_button.label = 'â–ļ Resume' if player.paused else '❚❚ Pause' + self.pause_button.disabled = not player.playing + self.skip_button.disabled = not player.playing + self.stop_button.disabled = not player.playing + self.filter_button.disabled = not player.playing + + # Update filter button appearance + dancing_emoji = discord.PartialEmoji.from_str('') + self.filter_button.label = '' if player.filters_applied else 'āļž' + self.filter_button.emoji = dancing_emoji if player.filters_applied else None + + def _update_navigation_buttons(self, player: AudioPlayer): + """Update state of navigation buttons""" + queue_len = len(player.queue) + history_len = len(player.queue.history) + + max_pages = max(queue_len - 1, 0) // QueueDisplay.MAX_ITEMS + min_pages = -(max(history_len - 1, 0) // QueueDisplay.MAX_ITEMS) - 1 if history_len else 0 + + self.queue_page = min(max_pages, max(min_pages, self.queue_page)) + self.previous_page_button.disabled = self.queue_page <= min_pages + self.next_page_button.disabled = self.queue_page >= max_pages + + def _update_queue_select(self, player: AudioPlayer): + """Update queue selection dropdown""" + if self.queue_page >= 0: + self._update_current_queue_select(player) + else: + self._update_history_queue_select(player) + + def _update_current_queue_select(self, player: AudioPlayer): + """Update dropdown for current queue""" + start_idx = self.queue_page * QueueDisplay.MAX_ITEMS + end_idx = (self.queue_page + 1) * QueueDisplay.MAX_ITEMS + + if not player.queue: + self.queue_select.disabled = True + self.queue_select.options = [discord.SelectOption(label=QueueDisplay.PLACEHOLDER)] + self.queue_select.placeholder = QueueDisplay.PLACEHOLDER + return + + self.queue_select.disabled = False + self.queue_select.options = [ + discord.SelectOption(label=f'{i + 1}. {track.title}', value=str(i)) + for i, track in enumerate(player.queue[start_idx:end_idx]) + ] + self.queue_select.placeholder = ( + f'Displaying: {start_idx + 1}-{min(end_idx, len(player.queue))} (current queue)' + ) - @discord.ui.button(label='◀◀ Prev', style=discord.ButtonStyle.blurple, row=0) + def _update_history_queue_select(self, player: AudioPlayer): + """Update dropdown for history queue""" + history = list(player.queue.history)[::-1] + start_idx = (-self.queue_page - 1) * QueueDisplay.MAX_ITEMS + end_idx = -self.queue_page * QueueDisplay.MAX_ITEMS + + if not history: + self.queue_select.disabled = True + self.queue_select.options = [discord.SelectOption(label=QueueDisplay.PLACEHOLDER)] + self.queue_select.placeholder = QueueDisplay.PLACEHOLDER + return + + self.queue_select.disabled = False + self.queue_select.options = [ + discord.SelectOption( + label=f'{i + 1}. {track.title}', + value=str(len(history) - 1 - i) + ) + for i, track in enumerate(history[start_idx:end_idx]) + ] + self.queue_select.placeholder = ( + f'Displaying: {start_idx + 1}-{min(end_idx, len(history))} (history queue)' + ) + + # Button Callbacks @user_bot_in_same_channel_check @button_cooldown - async def previous_button(self, interaction: discord.Interaction, button: discord.ui.Button): - """ - Play previous track. - """ + async def previous_callback(self, interaction: discord.Interaction): + """Play previous track""" player = cast(AudioPlayer, interaction.guild.voice_client) await player.play_previous_track() await interaction.response.defer() - @discord.ui.button(label='❚❚ Pause', style=discord.ButtonStyle.blurple, row=0) @user_bot_in_same_channel_check @button_cooldown - async def pause_button(self, interaction: discord.Interaction, button: discord.ui.Button): - """ - Pause/resume the player on button press - """ + async def pause_callback(self, interaction: discord.Interaction): + """Toggle pause state""" player = cast(AudioPlayer, interaction.guild.voice_client) await player.pause(not player.paused) await interaction.response.defer() await self._update_embed() - @discord.ui.button(label='â–ļâ–ļ Skip', style=discord.ButtonStyle.blurple, row=0) @user_bot_in_same_channel_check @is_playing_check @button_cooldown - async def skip_button(self, interaction: discord.Interaction, button: discord.ui.Button): - """ - Skip track on button press - """ + async def skip_callback(self, interaction: discord.Interaction): + """Skip current track""" player = cast(AudioPlayer, interaction.guild.voice_client) await player.skip() await interaction.response.defer() - @discord.ui.button(label='▮ Stop', style=discord.ButtonStyle.red, row=0) @user_bot_in_same_channel_check @is_playing_check @button_cooldown - async def stop_button(self, interaction: discord.Interaction, button: discord.ui.Button): - """ - Stop track on button press - """ + async def stop_callback(self, interaction: discord.Interaction): + """Stop playback and clear queue""" player = cast(AudioPlayer, interaction.guild.voice_client) player.queue.clear() await player.skip() await player.disable_filters() await interaction.response.defer() - @discord.ui.button(label='āļž', style=discord.ButtonStyle.grey, row=0) @user_bot_in_same_channel_check @is_playing_check - async def filter_button(self, interaction: discord.Interaction, button: discord.ui.Button): - """ - Filter to toggle audio filters - """ + async def filter_callback(self, interaction: discord.Interaction): + """Toggle audio filters""" player = cast(AudioPlayer, interaction.guild.voice_client) await player.toggle_nightcore_filter() await interaction.response.defer() await self._update_embed() - @discord.ui.select( - cls=discord.ui.Select, - options=[discord.SelectOption(label=NOTHING_IN_QUEUE_PLACEHOLDER)], - placeholder=NOTHING_IN_QUEUE_PLACEHOLDER, - max_values=1, - min_values=1, - disabled=True, - row=1, - ) @user_bot_in_same_channel_check - async def queue_select(self, interaction: discord.Interaction, select: discord.ui.Select): - """ - Allows selecting song from the queue and playing it - """ + async def queue_select_callback(self, interaction: discord.Interaction): + """Handle queue selection""" player = cast(AudioPlayer, interaction.guild.voice_client) - index = int(select.values[0]) - if self.queue_page > 0: + index = int(self.queue_select.values[0]) + + if self.queue_page >= 0: await player.play_track_from_queue(index) else: await player.play_track_from_history(index) - + await interaction.response.defer() - @discord.ui.button(label='◀', style=discord.ButtonStyle.grey, row=2, disabled=True) @user_bot_in_same_channel_check - async def previous_page(self, interaction: discord.Interaction, button: discord.ui.Button): - """ - Loads previous page in the select window - """ + async def previous_page_callback(self, interaction: discord.Interaction): + """Show previous page of queue""" self.queue_page -= 1 - self._prepare_track_selection_list() await interaction.response.defer() await self._update_embed() - @discord.ui.button(label='â–ļ', style=discord.ButtonStyle.grey, row=2, disabled=True) @user_bot_in_same_channel_check - async def next_page(self, interaction: discord.Interaction, button: discord.ui.Button): - """ - Loads next page in the select window - """ + async def next_page_callback(self, interaction: discord.Interaction): + """Show next page of queue""" self.queue_page += 1 - self._prepare_track_selection_list() await interaction.response.defer() await self._update_embed() - async def __prepare_embed(self) -> discord.Embed: - """ - returns embed based on current state of voice_client - """ - # Calculate queue time length - total_seconds = 0 - player = cast(AudioPlayer, self.text_channel.guild.voice_client) - for i, track in enumerate(player.queue): - total_seconds += track.length / 1000 - minutes = divmod(total_seconds, 60)[0] - hours, minutes = divmod(minutes, 60) - queue_time = f'⌛ {int(hours):02d} hr {int(minutes):02d} min' - - now_playing = player.current - if now_playing: - minutes, secs = divmod(now_playing.length / 1000, 60) - now_playing_time = f'⌛ {int(minutes):02d} min {int(secs):02d} s' - - # Prepare queue list - queue_preview = '' - if len(player.queue) > 10: - for i in range(10): - queue_preview += f"{i + 1}. {str(player.queue[i].title)}\n" - queue_preview += f'... and {len(player.queue) - 10} more.\n{queue_time}\n' - else: - for i, track in enumerate(player.queue): - queue_preview += f"{i + 1}. {str(track.title)}\n" - queue_preview += f'{queue_time}\n' - - # Create new embed - embed = discord.Embed(title='The Boi', color=0x00FF00, timestamp=datetime.datetime.now(datetime.timezone.utc)) - if len(player.queue) > 0: - embed.add_field(name='Queue:', value=queue_preview, inline=False) - if now_playing: - embed.add_field(name='Now Playing:', value=f'{now_playing.title}\n{now_playing_time}', inline=True) - thumbnail = now_playing.artwork - if thumbnail: - embed.set_thumbnail(url=thumbnail) - else: - embed.add_field(name='Nothing is playing right now', value=':(', inline=True) - embed.add_field(name='', value='▁▁▁▁▁▁▁▁▁▁▁▁▁▁▁', inline=False) - embed.set_footer(text='2137', icon_url='https://media.tenor.com/mc3OyxhLazUAAAAM/doggo-doge.gif') - return embed - async def _update_embed(self): - """ - Updates embed contents (does not remove / send) - """ + """Update existing embed message""" if not self.text_channel: return - embed = await self.__prepare_embed() - self._update_buttons_state() + embed = await self._create_embed() + self._update_ui_state() if self.message_handle: - await self.message_handle.edit(view=self, embed=embed) - - def _prepare_track_selection_list(self): - """ - calculates state of select window and related buttons - """ - player = cast(AudioPlayer, self.text_channel.guild.voice_client) - queue_len = len(player.queue) - history_len = len(player.queue.history) - max_pages = max(queue_len - 1, 0) // MAX_SELECT_LEN - min_pages = -(max(history_len - 1, 0) // MAX_SELECT_LEN) - 1 if history_len else 0 - self.queue_page = min(max_pages, max(min_pages, self.queue_page)) # so page is correct on queue lenth change - if self.queue_page >= 0: - first_index = self.queue_page * MAX_SELECT_LEN + 1 - last_index = (self.queue_page + 1) * MAX_SELECT_LEN - else: - first_index = (-self.queue_page - 1) * MAX_SELECT_LEN + 1 - last_index = -self.queue_page * MAX_SELECT_LEN - - self.previous_page.disabled = self.queue_page <= min_pages - self.next_page.disabled = self.queue_page >= max_pages - # fmt: off - self.queue_select.disabled = ( - not player.queue and self.queue_page >= 0) or ( - not player.queue.history and self.queue_page < 0 - ) - # fmt: on - if player.queue and self.queue_page >= 0: # display current queue - self.queue_select.options = [ - discord.SelectOption(label=f'{index +1}. {track.title}', value=str(index)) - for index, track in enumerate(player.queue) - if index + 1 >= first_index and index < last_index - ] - self.queue_select.placeholder = ( - f'Displaying: {first_index}-{min(last_index,len(player.queue))} (current queue)' - ) - elif self.queue_page < 0: # display history queue - self.queue_select.options = [ - discord.SelectOption(label=f'{index +1}. {track.title}', value=str(history_len - 1 - index)) - for index, track in enumerate(list(player.queue.history)[::-1]) - if index + 1 >= first_index and index < last_index - ] - self.queue_select.placeholder = ( - f'Displaying: {first_index}-{min(last_index,len(player.queue.history))} (history queue)' - ) - else: - self.queue_select.options = [discord.SelectOption(label=NOTHING_IN_QUEUE_PLACEHOLDER)] - self.queue_select.placeholder = NOTHING_IN_QUEUE_PLACEHOLDER - - def _update_buttons_state(self): - """ - calculates state of view - """ - dancing_black_man = discord.PartialEmoji.from_str('') - amogus = 'āļž' - - player = cast(AudioPlayer, self.text_channel.guild.voice_client) - self.previous_button.disabled = not len(player.queue.history) > 1 - self.pause_button.label = 'â–ļ Resume' if player.paused else '❚❚ Pause' - self.pause_button.disabled = not player.playing - self.skip_button.disabled = not player.playing - self.stop_button.disabled = not player.playing - self.filter_button.disabled = not player.playing - self.filter_button.label = amogus if not player.filters_applied else '' - self.filter_button.emoji = dancing_black_man if player.filters_applied else None - self._prepare_track_selection_list() + await self.message_handle.edit(view=self, embed=embed) \ No newline at end of file