From 71d3529a00977845f5ee41431c5ddac4041a5d96 Mon Sep 17 00:00:00 2001 From: John Toniutti Date: Mon, 9 Sep 2024 18:01:59 +0200 Subject: [PATCH] Bug fix --- Dockerfile | 120 +++++----- dsmusic/__main__.py | 176 ++++++++------- dsmusic/assistant/__init__.py | 0 dsmusic/assistant/cog.py | 217 ------------------ dsmusic/client.py | 250 +++++++++++---------- dsmusic/music/queue.py | 405 +++++++++++++++++----------------- pyproject.toml | 2 +- 7 files changed, 481 insertions(+), 689 deletions(-) delete mode 100644 dsmusic/assistant/__init__.py delete mode 100644 dsmusic/assistant/cog.py diff --git a/Dockerfile b/Dockerfile index ac54471..540bc27 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,61 +1,59 @@ -FROM python:3.12 as builder - -ENV PYTHONUNBUFFERED=1 \ - PYTHONDONTWRITEBYTECODE=1 \ - PIP_DISABLE_PIP_VERSION_CHECK=1 \ - PIP_DEFAULT_TIMEOUT=100 \ - POETRY_NO_INTERACTION=1 \ - POETRY_VIRTUALENVS_CREATE=0 \ - POETRY_CACHE_DIR=/tmp/poetry_cache \ - PATH="$PATH:/root/.poetry/bin" - -# Change workdir -WORKDIR /build - -# Install poetry -RUN pip install poetry - -# Add poetry files -COPY pyproject.toml poetry.lock ./ - -# Use poetry to resolve dependecies -RUN mkdir -p /build/wheels && poetry export -f requirements.txt --output /build/wheels/requirements.txt -# Compile dependencies -WORKDIR /build/wheels -RUN pip wheel --no-cache-dir --no-deps --wheel-dir /build/wheels -r /build/wheels/requirements.txt -# Install dependecies -RUN pip install --find-links /build/wheels -r /build/wheels/requirements.txt - -FROM python:3.12-slim - -ENV PYTHONFAULTHANDLER=1 \ - PYTHONUNBUFFERED=1 \ - PYTHONHASHSEED=random \ - PYTHONDONTWRITEBYTECODE=1 - -ENV DS_TOKEN "YOUR_DISCORD_TOKEN" # discord token from the developer portal -ENV DS_GUILD_ID "YOUR_GUILD_ID" # the guild id where the bot will be used -#ENV CF_CLIENT_ID "YOUR_CLIENT_ID" # cloudflare client id -#ENV CF_TOKEN "YOUR_CLOUDFLARE_TOKEN" # cloudflare token - -# Copy project dependecies -COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages - -# Change workdir -WORKDIR /bot - -# Setting up proper permissions: -RUN groupadd -r bot && useradd -d /bot -r -g bot bot \ - && mkdir -p /bot/config && chown bot:bot -R /bot - -# Run as non-root user -USER bot - -# Copy project -COPY --chown=bot:bot dsmusic/ /bot/dsmusic/ - -VOLUME ["/bot/config"] -VOLUME ["/bot/data"] - -# Commands to execute inside container -CMD ["python", "-O", "-B", "-m", "dsmusic"] +FROM python:3.12 as builder + +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 \ + PIP_DEFAULT_TIMEOUT=100 \ + POETRY_NO_INTERACTION=1 \ + POETRY_VIRTUALENVS_CREATE=0 \ + POETRY_CACHE_DIR=/tmp/poetry_cache \ + PATH="$PATH:/root/.poetry/bin" + +# Change workdir +WORKDIR /build + +# Install poetry +RUN pip install poetry + +# Add poetry files +COPY pyproject.toml poetry.lock ./ + +# Use poetry to resolve dependecies +RUN mkdir -p /build/wheels && poetry export -f requirements.txt --output /build/wheels/requirements.txt +# Compile dependencies +WORKDIR /build/wheels +RUN pip wheel --no-cache-dir --no-deps --wheel-dir /build/wheels -r /build/wheels/requirements.txt +# Install dependecies +RUN pip install --find-links /build/wheels -r /build/wheels/requirements.txt + +FROM python:3.12-slim + +ENV PYTHONFAULTHANDLER=1 \ + PYTHONUNBUFFERED=1 \ + PYTHONHASHSEED=random \ + PYTHONDONTWRITEBYTECODE=1 + +ENV DS_TOKEN "YOUR_DISCORD_TOKEN" # discord token from the developer portal +ENV DS_GUILD_ID "YOUR_GUILD_ID" # the guild id where the bot will be used + +# Copy project dependecies +COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages + +# Change workdir +WORKDIR /bot + +# Setting up proper permissions: +RUN groupadd -r bot && useradd -d /bot -r -g bot bot \ + && mkdir -p /bot/config && chown bot:bot -R /bot + +# Run as non-root user +USER bot + +# Copy project +COPY --chown=bot:bot dsmusic/ /bot/dsmusic/ + +VOLUME ["/bot/config"] +VOLUME ["/bot/data"] + +# Commands to execute inside container +CMD ["python", "-O", "-B", "-m", "dsmusic"] diff --git a/dsmusic/__main__.py b/dsmusic/__main__.py index 9ebce1f..abf88ed 100644 --- a/dsmusic/__main__.py +++ b/dsmusic/__main__.py @@ -1,85 +1,91 @@ -import os -import logging - -import discord - -from .client import Client - -try: - import uvloop -except ImportError: - pass -else: - uvloop.install() - - -def setup_discord_auxiliary_objects(): - intents = discord.Intents( - guilds=True, - members=True, - messages=True, - voice_states=True, - presences=True, - message_content=True - ) - - permissions = discord.Permissions( - send_messages=True, - read_messages=True, - - connect=True, - speak=True, - use_voice_activation=True, - - manage_threads=True, - send_messages_in_threads=True, - - attach_files=True, - embed_links=True, - ) - - return intents, permissions - - -def setup_logging(): - logging.basicConfig( - level=logging.INFO, - format="[%(asctime)s] [%(levelname)s] %(name)s: %(message)s", - datefmt="%Y-%m-%d %H:%M:%S" - ) - logging.getLogger('discord').setLevel(logging.WARNING) - logging.getLogger('discord.client').setLevel(logging.INFO) - logging.getLogger('mafic').setLevel(logging.WARNING) - logging.getLogger('mafic.strategy').setLevel(logging.CRITICAL) - - -def main(): - setup_logging() - intents, permissions = setup_discord_auxiliary_objects() - - client = Client( - intents=intents, - command_prefix="!", - activity=discord.CustomActivity(name="Gressinbon"), - status=discord.Status.online, - mentions=discord.AllowedMentions.none(), - help_command=None - ) - - oauth_url = discord.utils.oauth_url( - client_id=839827510761488404, - guild=discord.Object(os.getenv("DS_GUILD_ID")), - permissions=permissions - ) - print(f"Bot URL: {oauth_url}") - - token = os.getenv("DS_TOKEN") - - if token: - client.run(token=token, log_handler=None) - else: - raise ValueError("Missing token") - - -if __name__ == "__main__": - main() +import asyncio +import os +import logging +import sys + +import discord + +from .client import Client + +try: + import uvloop +except ImportError: + pass +else: + uvloop.install() + + +def setup_discord_auxiliary_objects(): + intents = discord.Intents( + guilds=True, + members=True, + messages=True, + voice_states=True, + presences=True, + message_content=True + ) + + permissions = discord.Permissions( + send_messages=True, + read_messages=True, + + connect=True, + speak=True, + use_voice_activation=True, + + manage_threads=True, + send_messages_in_threads=True, + + attach_files=True, + embed_links=True, + ) + + return intents, permissions + + +def setup_logging(): + logging.basicConfig( + level=logging.INFO, + format="[%(asctime)s] [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + logging.getLogger('discord').setLevel(logging.WARNING) + logging.getLogger('discord.client').setLevel(logging.INFO) + logging.getLogger('mafic').setLevel(logging.WARNING) + logging.getLogger('mafic.strategy').setLevel(logging.CRITICAL) + + +def main(): + setup_logging() + + if sys.platform == 'win32': + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + + intents, permissions = setup_discord_auxiliary_objects() + + client = Client( + intents=intents, + command_prefix="!", + activity=discord.CustomActivity(name="Gressinbon"), + status=discord.Status.online, + mentions=discord.AllowedMentions.none(), + help_command=None + ) + + oauth_url = discord.utils.oauth_url( + client_id=839827510761488404, + guild=discord.Object(os.getenv("DS_GUILD_ID")), + permissions=permissions + ) + print(f"Bot URL: {oauth_url}") + + token = os.getenv("DS_TOKEN") + + if token: + client.run(token=token, log_handler=None) + else: + raise ValueError("Missing token") + + +if __name__ == "__main__": + main() diff --git a/dsmusic/assistant/__init__.py b/dsmusic/assistant/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/dsmusic/assistant/cog.py b/dsmusic/assistant/cog.py deleted file mode 100644 index c49ccb8..0000000 --- a/dsmusic/assistant/cog.py +++ /dev/null @@ -1,217 +0,0 @@ -import logging -from dataclasses import dataclass, field, asdict -from os import getenv -from typing import Any, Literal - -import aiohttp -import discord -from discord import app_commands -from discord.ext import commands -from yarl import URL - -logger = logging.getLogger('dsbot.assistant.cog') - -DEFAULT_MODEL = "@hf/thebloke/llama-2-13b-chat-awq" - - -@dataclass -class UnscopedPrompt: - prompt: str - raw: bool = field(default=False, init=False) - stream: bool = field(default=False, init=False) - max_tokens: int = field(default=256, init=False) - - -@dataclass -class Message: - content: str - role: Literal["user", "system", "assistant"] = "assistant" - - -@dataclass -class Response: - response: str - - -@app_commands.guild_only() -class Assistant(commands.Cog): - def __init__( - self, - bot: discord.Client, - cf_account_id: str, - cf_api_token: str, - session: aiohttp.ClientSession | None = None, - *, - rest_url: str | URL | None = None, - ): - self.bot = bot - - if session is None: - from orjson import dumps - self.session = aiohttp.ClientSession( - json_serialize=lambda obj: dumps(obj).decode("utf-8", errors="ignore") - ) - else: - self.session = session - - self._cf_account_id = cf_account_id - self._cf_api_token = cf_api_token - - if rest_url is None: - self.rest_url = URL(f"https://api.cloudflare.com/client/v4/accounts/{cf_account_id}/ai/run/") - else: - if isinstance(rest_url, URL): - self.rest_url = rest_url - else: - self.rest_url = URL(rest_url) - - self.headers = { - "Authorization": f"Bearer {cf_api_token}", - "Content-Type": "application/json", - "DNT": "1", - "Accept-Encoding": "gzip, deflate, br", - "User-Agent": "ds-bot" - } - self.session.headers.update(self.headers) - - if getenv("SYSTEM_PROMPT") is None: - self.system_message = ("You are a chatbot with the objective to help in any way necessary the user, " - "while providing only true information and keeping the answers short and concise.") - else: - self.system_message = getenv("SYSTEM_PROMPT") - - async def _request( - self, - url: URL, - method: str, - headers: dict[str, Any] | None = None, - payload: dict[str, Any] | None = None, - query: dict[str, Any] | None = None - ) -> dict[str, Any]: - """ - Make a request using aiohttp - :param url: the url to make the request to - :param method: method to use - :param headers: headers to use - :param payload: payload to send (used for POST requests) - :param query: query to append to the url - :return: the response as a dict (json - """ - async with self.session.request( - method, - url, - headers=headers, - json=payload, - params=query - ) as resp: - return await resp.json() - - async def unscoped_prompt( - self, - prompt: str, - raw: bool = False, - max_tokens: int = 256, - *, model: str = DEFAULT_MODEL - ) -> Response: - """ - Send an unscoped prompt to the llm - :param prompt: the prompt to send - :param raw: whether the prompt uses raw parameters in the prompt - :param max_tokens: the maximum number of tokens to generate - :param model: the model to use (list of models: https://developers.cloudflare.com/workers-ai/models/) - :return: the response from the llm - """ - data = { - "prompt": prompt, - "raw": raw, - "stream": False, - "max_tokens": max_tokens - } - - url = self.rest_url / model - response = await self._request(url, "POST", payload=data) - - return self.parse_response(response) - - async def scoped_prompt( - self, - messages: list[Message], - max_tokens: int = 256, - *, model: str = DEFAULT_MODEL - ): - """ - Send a scoped prompt to the llm. - This allows to have a conversation with the llm using the previous messages as knowledge. - :param messages: a list of messages to send - :param max_tokens: the maximum number of tokens to generate - :param model: the model to use (list of models: https://developers.cloudflare.com/workers-ai/models/) - :return: the response from the llm - """ - if messages[0].role != "system": - messages.insert(0, Message( - content=self.system_message, - role="system" - )) - - payload = { - "messages": [asdict(message) for message in messages], - "stream": False, - "max_tokens": max_tokens - } - - url = self.rest_url / model - response = await self._request(url, "POST", payload=payload) - - return self.parse_response(response) - - async def single_scoped_prompt(self, prompt: str, max_tokens: int = 256, model: str = DEFAULT_MODEL) -> Response: - """ - Send a single prompt with system message - :param prompt: prompt to send - :param max_tokens: max number of tokens to generate - :param model: model to use - :return: the response from the llm - """ - messages = [Message(content=prompt, role="user")] - - return await self.scoped_prompt(messages, max_tokens, model=model) - - @staticmethod - def parse_response(response: dict) -> Response: - if "error" in response: - raise commands.CommandError(response["error"]) - - if response.get("success", False) is False: - logger.error(response) - raise commands.CommandError("An error occurred") - - return Response(**response["result"]) - - @app_commands.command(name="ask", description="Ask a question to the assistant") - @app_commands.checks.cooldown(3, 10, key=lambda i: (i.guild_id, i.user.id)) - @app_commands.describe(prompt="The question you want to ask", model="The model to use (use openchat by default)") - async def unscoped_prompt_command(self, interaction: discord.Interaction, prompt: str, model: str | None = None): - """Send a prompt to the assistant""" - # noinspection PyTypeChecker - resp: discord.InteractionResponse = interaction.response - - if model is None: - model = DEFAULT_MODEL - - await resp.defer(thinking=True) - llm_response = await self.unscoped_prompt(prompt, model=model) - - return await interaction.followup.send(llm_response.response) - - -async def setup(bot: commands.Bot) -> None: - logger.debug("Loading assistant cog") - client_id = getenv("CF_ACCOUNT_ID", None) - api_token = getenv("CF_TOKEN", None) - - if client_id is None or api_token is None: - logger.warning("Cloudflare credentials not found, assistant cog will not be loaded") - return - - await bot.add_cog(Assistant(bot, client_id, api_token, )) - logger.info("Assistant cog loaded") diff --git a/dsmusic/client.py b/dsmusic/client.py index d7ea110..39b5545 100644 --- a/dsmusic/client.py +++ b/dsmusic/client.py @@ -1,127 +1,123 @@ -import asyncio -import json -import logging -import os -from os import getenv - -import discord -import mafic -from discord import app_commands -from discord.ext import commands -from mafic import NodeAlreadyConnected - -__all__ = [ - "Client" -] - -logger = logging.getLogger('dsbot') - - -async def response_after_error(interaction: discord.Interaction, message: str): - try: - await interaction.response.send_message(message, ephemeral=True) - except discord.errors.InteractionResponded: - await interaction.followup.send(message, ephemeral=True) - - -class Client(commands.Bot): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - # Add nodes - self.pool = mafic.NodePool(self) - - # App commands - self.guild_id = discord.Object(id=getenv("DS_GUILD_ID", 0)) - self.tree.on_error = self.on_tree_error - - # Load config from env vars - self.tracker_enabled = int(getenv("ENABLE_TRACKER", "1")) == 1 - self.assistant_enabled = int(getenv("ENABLE_ASSISTANT", "1")) == 1 - self.music_enabled = int(getenv("ENABLE_MUSIC", "1")) == 1 - - async def setup_hook(self): - logger.info("Loading extensions") - - if self.tracker_enabled: - await self.load_extension("dsmusic.tracker.cog") - - if self.assistant_enabled: - await self.load_extension("dsmusic.assistant.cog") - - if self.music_enabled: - await self.load_extension("dsmusic.music.cog") - - logger.info("Extensions loaded") - - async def on_ready(self): - logger.info(f"Logged in as {self.user}") - - # Add lavalink nodes - if self.music_enabled: - await self.add_nodes() - - if len(self.pool.nodes) == 0: - logger.warning("Disabling music cog") - self.music_enabled = False - - # This copies the global commands over to your guild. - logger.info("Syncing command tree") - self.tree.copy_global_to(guild=self.guild_id) - await self.tree.sync(guild=self.guild_id) - - async def add_nodes(self): - """Add and connect to lavalink nodes""" - # noinspection PyShadowingNames - logger = logging.getLogger('dsbot.lavalink') - logger.info("Adding lavalink nodes") - - if os.path.exists("config/lavalink.json") and os.path.isfile("config/lavalink.json"): - with open("config/lavalink.json") as f: - data = json.load(f) - elif os.path.isdir("config/lavalink.json"): - logger.error("Lavalink config is a directory") - return - else: - with open("config/lavalink.json", "w") as f: - f.write("[]") - logger.error("Lavalink config not available") - return - - for node_info in data: - try: - async with asyncio.timeout(10): - await self.pool.create_node( - host=node_info["uri"], - port=node_info["port"], - label=f"CONFIG-{data.index(node_info)}", - password=node_info["password"], - secure=False, - timeout=5, - ) - logger.info(f"Node {node_info['uri']} added") - except NodeAlreadyConnected: - pass - except (TimeoutError, asyncio.TimeoutError) as e: - logger.error(f"Node {node_info['uri']}:{node_info['port']} timed out. {e}") - except RuntimeError as e: - logger.error(f"Node {node_info['uri']}:{node_info['port']} failed. {e}") - except Exception as e: - logger.error(e) - - if len(self.pool.nodes) == 0: - logger.error("No nodes connected") - else: - logger.info(f"{len(self.pool.nodes)} nodes connected") - - # noinspection PyUnresolvedReferences - @staticmethod - async def on_tree_error(interaction: discord.Interaction, error: app_commands.AppCommandError): - if isinstance(error, app_commands.CommandOnCooldown): - return await response_after_error(interaction, f"You are currently on cooldown!") - elif isinstance(error, app_commands.MissingPermissions): - return await response_after_error( - interaction, "I don't have the required permissions to execute this command") - else: - logger.error(error) - return await response_after_error(interaction, "An error occurred") +import asyncio +import json +import logging +import os +from os import getenv + +import discord +import mafic +from discord import app_commands +from discord.ext import commands +from mafic import NodeAlreadyConnected + +__all__ = [ + "Client" +] + +logger = logging.getLogger('dsbot') + + +async def response_after_error(interaction: discord.Interaction, message: str): + try: + await interaction.response.send_message(message, ephemeral=True) + except discord.errors.InteractionResponded: + await interaction.followup.send(message, ephemeral=True) + + +class Client(commands.Bot): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # Add nodes + self.pool = mafic.NodePool(self) + + # App commands + self.guild_id = discord.Object(id=getenv("DS_GUILD_ID", 0)) + self.tree.on_error = self.on_tree_error + + # Load config from env vars + self.tracker_enabled = int(getenv("ENABLE_TRACKER", "1")) == 1 + self.music_enabled = int(getenv("ENABLE_MUSIC", "1")) == 1 + + async def setup_hook(self): + logger.info("Loading extensions") + + if self.tracker_enabled: + await self.load_extension("dsmusic.tracker.cog") + + if self.music_enabled: + await self.load_extension("dsmusic.music.cog") + + logger.info("Extensions loaded") + + async def on_ready(self): + logger.info(f"Logged in as {self.user}") + + # Add lavalink nodes + if self.music_enabled: + await self.add_nodes() + + if len(self.pool.nodes) == 0: + logger.warning("Disabling music cog") + self.music_enabled = False + + # This copies the global commands over to your guild. + logger.info("Syncing command tree") + self.tree.copy_global_to(guild=self.guild_id) + await self.tree.sync(guild=self.guild_id) + + async def add_nodes(self): + """Add and connect to lavalink nodes""" + # noinspection PyShadowingNames + logger = logging.getLogger('dsbot.lavalink') + logger.info("Adding lavalink nodes") + + if os.path.exists("config/lavalink.json") and os.path.isfile("config/lavalink.json"): + with open("config/lavalink.json") as f: + data = json.load(f) + elif os.path.isdir("config/lavalink.json"): + logger.error("Lavalink config is a directory") + return + else: + with open("config/lavalink.json", "w") as f: + f.write("[]") + logger.error("Lavalink config not available") + return + + for node_info in data: + try: + async with asyncio.timeout(10): + await self.pool.create_node( + host=node_info["uri"], + port=node_info["port"], + label=f"CONFIG-{data.index(node_info)}", + password=node_info["password"], + secure=False, + timeout=5, + ) + logger.info(f"Node {node_info['uri']} added") + except NodeAlreadyConnected: + pass + except (TimeoutError, asyncio.TimeoutError) as e: + logger.error(f"Node {node_info['uri']}:{node_info['port']} timed out. {e}") + except RuntimeError as e: + logger.error(f"Node {node_info['uri']}:{node_info['port']} failed. {e}") + except Exception as e: + logger.error(e) + + if len(self.pool.nodes) == 0: + logger.error("No nodes connected") + else: + logger.info(f"{len(self.pool.nodes)} nodes connected") + + # noinspection PyUnresolvedReferences + @staticmethod + async def on_tree_error(interaction: discord.Interaction, error: app_commands.AppCommandError): + if isinstance(error, app_commands.CommandOnCooldown): + return await response_after_error(interaction, f"You are currently on cooldown!") + elif isinstance(error, app_commands.MissingPermissions): + return await response_after_error( + interaction, "I don't have the required permissions to execute this command") + else: + logger.error(error) + return await response_after_error(interaction, "An error occurred") diff --git a/dsmusic/music/queue.py b/dsmusic/music/queue.py index e10270d..f4a8fe2 100644 --- a/dsmusic/music/queue.py +++ b/dsmusic/music/queue.py @@ -1,198 +1,207 @@ -from random import randint -from typing import Optional - -import discord -from mafic import Track, Playlist - -__all__ = [ - "Queue" -] - - -def playlist_embed(result: Playlist) -> discord.Embed: - if result.tracks[0].source == "youtube": - color = discord.Color.red() - elif result.tracks[0].source == "soundcloud": - color = discord.Color.orange() - else: - color = discord.Color.random() - - embed = discord.Embed(color=color) - - embed.title = result.name - embed.description = "Playlist" - embed.set_thumbnail(url=result.tracks[0].artwork_url) - - return embed - - -def track_embed(result: Track) -> discord.Embed: - if result.source == "twitch": - color = discord.Color.purple() - elif result.source == "youtube": - color = discord.Color.red() - elif result.source == "soundcloud": - color = discord.Color.orange() - else: - color = discord.Color.random() - - embed = discord.Embed(color=color) - - embed.title = result.title - embed.url = result.uri - embed.set_author(name=result.author) - embed.set_thumbnail(url=result.artwork_url) - - if not result.stream: - embed.add_field(name="Video duration", value=parse_seconds(result.length // 1000)) - else: - embed.add_field(name="Live on", value=result.source) - - return embed - - -def parse_seconds(seconds: int) -> str: - minutes, seconds = divmod(seconds, 60) - hours, minutes = divmod(minutes, 60) - - result = "%(minutes)02d:%(seconds)02d" % {"minutes": minutes, "seconds": seconds} - - if hours != 0: - result = f"{hours}:" + result - return result - - -class Queue: - _current: Track | None = None - _queue: list[Track] = [] - _queue_length: int = 0 - - _loop_queue: bool = False - _loop_current: bool = False - _shuffle: bool = False - - def toggle_loop(self, status: Optional[bool] = None) -> bool: - """ - Loop the current queue - :param status: force a certain status on the loop queue - :return: the current status - """ - if status: - self._loop_queue = status - else: - self._loop_queue = not self._loop_queue - - return self._loop_queue - - def toggle_repeat(self, status: Optional[bool] = None) -> bool: - """ - Repeat the current song - :param status: force a certain status on the repeat value - :return: the current status - """ - if status: - self._loop_current = status - else: - self._loop_current = not self._loop_current - - return self._loop_current - - def toggle_shuffle(self, status: Optional[bool] = None) -> bool: - """ - Play the queue in a random order - :param status: force a certain status on the shuffle value - :return: the current status - """ - if status: - self._shuffle = status - else: - self._shuffle = not self._shuffle - - return self._shuffle - - def _add_to_queue(self, track: Track) -> int: - """ - Add a track to the queue - :param track: the track to add - :return: if the track was added - """ - track_length = track.length // 1000 - - if track_length > 3600: # max 1 hour - return -2 - elif self._queue_length + track_length >= 8200: # total max 132 minutes - return -1 - elif len(self._queue) > 48: # max 48 songs - return 0 - else: - self._queue_length += track_length - self._queue.append(track) - return 1 - - def add(self, data: Playlist | Track | list) -> discord.Embed | None: - """ - Add a playlist or a single track to the queue - - :param data: A playlist or a single track - :return: an Embed for the added object - """ - if isinstance(data, Track): - if self._add_to_queue(track=data) == 1: - embed = track_embed(data) - else: - return None - elif isinstance(data, Playlist): - added = 0 - for track in data.tracks: - ret = self._add_to_queue(track=track) - if ret == 1: - added += 1 - elif ret == 0: - break - embed = playlist_embed(data).add_field(name="Number of videos", value=added) - elif isinstance(data, list): - return self.add(data[0]) - else: - return None - - return embed - - def next(self) -> Track | None: - """ - Get the next track to play - :return: a Track object - """ - if len(self._queue) == 0: - _current = None - return None - - if self._loop_current: - return self._current - - if self._shuffle: - index = randint(0, len(self._queue)) - else: - index = 0 - - track = self._queue.pop(index) - - if self._loop_queue: - self._queue.append(track) - else: - self._queue_length -= track.length - - _current = Track - return track - - def clean(self) -> int: - """ - Reset the queue removing all the elements - :return: the number of elements removed - """ - size = len(self._queue) - del self._queue - self._queue = [] - self._queue_length = 0 - self._current = None - - return size +import logging +from random import randint +from typing import Optional + +import discord +from mafic import Track, Playlist + +__all__ = [ + "Queue" +] + + +logger = logging.getLogger('dsbot.music.queue') + + +def playlist_embed(result: Playlist) -> discord.Embed: + if result.tracks[0].source == "youtube": + color = discord.Color.red() + elif result.tracks[0].source == "soundcloud": + color = discord.Color.orange() + else: + color = discord.Color.random() + + embed = discord.Embed(color=color) + + embed.title = result.name + embed.description = "Playlist" + embed.set_thumbnail(url=result.tracks[0].artwork_url) + + return embed + + +def track_embed(result: Track) -> discord.Embed: + if result.source == "twitch": + color = discord.Color.purple() + elif result.source == "youtube": + color = discord.Color.red() + elif result.source == "soundcloud": + color = discord.Color.orange() + else: + color = discord.Color.random() + + embed = discord.Embed(color=color) + + embed.title = result.title + embed.url = result.uri + embed.set_author(name=result.author) + embed.set_thumbnail(url=result.artwork_url) + + if not result.stream: + embed.add_field(name="Video duration", value=parse_seconds(result.length // 1000)) + else: + embed.add_field(name="Live on", value=result.source) + + return embed + + +def parse_seconds(seconds: int) -> str: + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + + result = "%(minutes)02d:%(seconds)02d" % {"minutes": minutes, "seconds": seconds} + + if hours != 0: + result = f"{hours}:" + result + return result + + +class Queue: + _current: Track | None = None + _queue: list[Track] = [] + _queue_length: int = 0 + + _loop_queue: bool = False + _loop_current: bool = False + _shuffle: bool = False + + def toggle_loop(self, status: Optional[bool] = None) -> bool: + """ + Loop the current queue + :param status: force a certain status on the loop queue + :return: the current status + """ + if status: + self._loop_queue = status + else: + self._loop_queue = not self._loop_queue + + return self._loop_queue + + def toggle_repeat(self, status: Optional[bool] = None) -> bool: + """ + Repeat the current song + :param status: force a certain status on the repeat value + :return: the current status + """ + if status: + self._loop_current = status + else: + self._loop_current = not self._loop_current + + return self._loop_current + + def toggle_shuffle(self, status: Optional[bool] = None) -> bool: + """ + Play the queue in a random order + :param status: force a certain status on the shuffle value + :return: the current status + """ + if status: + self._shuffle = status + else: + self._shuffle = not self._shuffle + + return self._shuffle + + def _add_to_queue(self, track: Track) -> int: + """ + Add a track to the queue + :param track: the track to add + :return: if the track was added + """ + track_length = track.length // 1000 + + if track_length > 3600: # max 1 hour + return -2 + elif self._queue_length + track_length >= 8200: # total max 132 minutes + return -1 + elif len(self._queue) > 48: # max 48 songs + return 0 + else: + self._queue_length += track_length + self._queue.append(track) + return 1 + + def add(self, data: Playlist | Track | list) -> discord.Embed | None: + """ + Add a playlist or a single track to the queue + + :param data: A playlist or a single track + :return: an Embed for the added object + """ + if isinstance(data, Track): + if self._add_to_queue(track=data) == 1: + embed = track_embed(data) + else: + return None + elif isinstance(data, Playlist): + added = 0 + for track in data.tracks: + ret = self._add_to_queue(track=track) + if ret == 1: + added += 1 + elif ret == 0: + break + embed = playlist_embed(data).add_field(name="Number of videos", value=added) + elif isinstance(data, list): + return self.add(data[0]) + else: + return None + + return embed + + def next(self) -> Track | None: + """ + Get the next track to play + :return: a Track object + """ + if len(self._queue) == 0: + self._current = None + return None + + if self._loop_current: + return self._current + + if self._shuffle: + index = randint(0, len(self._queue)) % len(self._queue) + else: + index = 0 + + try: + track = self._queue.pop(index) + except IndexError: + logger.error(f"IndexError in Queue.next, queue length: {len(self._queue)}, index: {index}") + self._current = None + return None + + if self._loop_queue: + self._queue.append(track) + else: + self._queue_length -= track.length + + self._current = track + return track + + def clean(self) -> int: + """ + Reset the queue removing all the elements + :return: the number of elements removed + """ + size = len(self._queue) + del self._queue + self._queue = [] + self._queue_length = 0 + self._current = None + + return size diff --git a/pyproject.toml b/pyproject.toml index 7b6ef8e..2f7d827 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dsmusic" -version = "0.6.13" +version = "0.6.14" description = "A simple music bot" license = "MIT" authors = ["John Toniutti "]