diff --git a/.github/workflows/check.yml b/.github/workflows/check.yml index 8462f34514..b39a00fc92 100644 --- a/.github/workflows/check.yml +++ b/.github/workflows/check.yml @@ -20,7 +20,7 @@ jobs: python -m pip install --upgrade pip pip install -r requirements/dev.txt - run: - codespell --ignore-words-list="groupt,nd,ot,ro,falsy,BU" \ + codespell --ignore-words-list="groupt,nd,ot,ro,falsy,BU,invokable" \ --exclude-file=".github/workflows/codespell.yml" bandit: runs-on: ubuntu-latest diff --git a/CHANGELOG.md b/CHANGELOG.md index fea137c75a..fea2150498 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,25 @@ These changes are available on the `master` branch, but have not yet been releas - Added `suppress` and `allowed_mentions` parameters to `Webhook` and `InteractionResponse` edit methods. ([#2138](https://github.com/Pycord-Development/pycord/pull/2138)) +- Added `_parse_arguments` function to slash commands to parse arguments instead of in + `prepare`. ([#1606](https://github.com/Pycord-Development/pycord/pull/1606)) +- Added the `parents`, `root_parent`, and `cog_name` attribute to slash commands. + ([#1606](https://github.com/Pycord-Development/pycord/pull/1606)) +- Added the `error` decorator to slash commands. + ([#1606](https://github.com/Pycord-Development/pycord/pull/1606)) +- Added the `cooldown_after_parsing` parameter & attribute to slash commands. + ([#1606](https://github.com/Pycord-Development/pycord/pull/1606)) +- Added the `reinvoke` function to `ApplicationContext`. + ([#1606](https://github.com/Pycord-Development/pycord/pull/1606)) +- Added the `invoked_with`, `invoked_parents`, `invoked_subcommand`, + `subcommand_passed`, and `command_failed` parameters & attributes to + `ApplicationContext`. + ([#1606](https://github.com/Pycord-Development/pycord/pull/1606)) +- Added the `source` attribute to `ext.commands.Context` & `ApplicationContext` for a + common way to either retrieve the message or the interaction that triggered the + command. ([#1606](https://github.com/Pycord-Development/pycord/pull/1606)) +- Documented `ContextMenuCommand`. + ([#1606](https://github.com/Pycord-Development/pycord/pull/1606)) - Added `wait_finish` parameter to `VoiceClient.play` for awaiting the end of a play. ([#2194](https://github.com/Pycord-Development/pycord/pull/2194)) - Added support for custom bot status. @@ -113,6 +132,13 @@ These changes are available on the `master` branch, but have not yet been releas ([#2099](https://github.com/Pycord-Development/pycord/pull/2099)) - Changed the support from `orjson` to `msgspec` in the codebase. ([#2170](https://github.com/Pycord-Development/pycord/pull/2170)) +- Renamed `_invoke` in application commands to `invoke`. + ([#1606](https://github.com/Pycord-Development/pycord/pull/1606)) +- Deprecated `ext/commands/cooldowns` in favour of `commands/cooldowns`. + ([#1606](https://github.com/Pycord-Development/pycord/pull/1606)) +- Deprecated `ApplicationCommandError` & `ApplicationCommandInvokeError` in favour of + `CommandError` & `CommandInvokeError` respectively. + ([#1606](https://github.com/Pycord-Development/pycord/pull/1606)) ### Removed diff --git a/discord/bot.py b/discord/bot.py index 545c11bb8d..89d5b592f1 100644 --- a/discord/bot.py +++ b/discord/bot.py @@ -42,6 +42,7 @@ ApplicationCommand, ApplicationContext, AutocompleteContext, + BaseContext, MessageCommand, SlashCommand, SlashCommandGroup, @@ -862,8 +863,9 @@ async def process_application_commands( ) ctx = await self.get_application_context(interaction) - if command: + if command and not ctx.command: ctx.command = command + await self.invoke_application_command(ctx) async def on_application_command_auto_complete( @@ -1212,7 +1214,6 @@ async def on_application_command_error( ) # global check registration - # TODO: Remove these from commands.Bot def check(self, func): """A decorator that adds a global check to the bot. A global check is similar to a :func:`.check` that is @@ -1222,8 +1223,8 @@ def check(self, func): .. note:: This function can either be a regular function or a coroutine. Similar to a command :func:`.check`, this - takes a single parameter of type :class:`.Context` and can only raise exceptions inherited from - :exc:`.ApplicationCommandError`. + takes a single parameter of type :class:`.BaseContext` and can only raise exceptions inherited from + :exc:`.CommandError`. Example ------- @@ -1282,14 +1283,14 @@ def check_once(self, func): .. note:: - When using this function the :class:`.Context` sent to a group subcommand may only parse the parent command + When using this function the :class:`.BaseContext` sent to a group subcommand may only parse the parent command and not the subcommands due to it being invoked once per :meth:`.Bot.invoke` call. .. note:: This function can either be a regular function or a coroutine. Similar to a command :func:`.check`, - this takes a single parameter of type :class:`.Context` and can only raise exceptions inherited from - :exc:`.ApplicationCommandError`. + this takes a single parameter of type :class:`.BaseContext` and can only raise exceptions inherited from + :exc:`.CommandError`. Example ------- @@ -1318,7 +1319,7 @@ def before_invoke(self, coro): A pre-invoke hook is called directly before the command is called. This makes it a useful function to set up database connections or any type of set up required. - This pre-invoke hook takes a sole parameter, a :class:`.Context`. + This pre-invoke hook takes a sole parameter, a :class:`.BaseContext`. .. note:: @@ -1348,7 +1349,7 @@ def after_invoke(self, coro): A post-invoke hook is called directly after the command is called. This makes it a useful function to clean-up database connections or any type of clean up required. - This post-invoke hook takes a sole parameter, a :class:`.Context`. + This post-invoke hook takes a sole parameter, a :class:`.BaseContext`. .. note:: diff --git a/discord/cog.py b/discord/cog.py index 686575db6f..5fed6e67f9 100644 --- a/discord/cog.py +++ b/discord/cog.py @@ -37,7 +37,8 @@ from . import errors from .commands import ( ApplicationCommand, - ApplicationContext, + BaseContext, + Invokable, SlashCommandGroup, _BaseCommand, ) @@ -230,7 +231,7 @@ def __new__(cls: type[CogMeta], *args: Any, **kwargs: Any) -> CogMeta: # Either update the command with the cog provided defaults or copy it. # r.e type ignore, type-checker complains about overriding a ClassVar - new_cls.__cog_commands__ = tuple(c._update_copy(cmd_attrs) if not hasattr(c, "add_to") else c for c in new_cls.__cog_commands__) # type: ignore + new_cls.__cog_commands__ = tuple(c.copy(cmd_attrs) if not hasattr(c, "add_to") else c for c in new_cls.__cog_commands__) # type: ignore name_filter = lambda c: ( "app" @@ -313,7 +314,7 @@ def __new__(cls: type[CogT], *args: Any, **kwargs: Any) -> CogT: # To do this, we need to interfere with the Cog creation process. return super().__new__(cls) - def get_commands(self) -> list[ApplicationCommand]: + def get_commands(self) -> list[Invokable]: r""" Returns -------- @@ -345,12 +346,12 @@ def description(self) -> str: def description(self, description: str) -> None: self.__cog_description__ = description - def walk_commands(self) -> Generator[ApplicationCommand, None, None]: + def walk_commands(self) -> Generator[Invokable, None, None]: """An iterator that recursively walks through this cog's commands and subcommands. Yields ------ - Union[:class:`.Command`, :class:`.Group`] + Union[:class:`.Invokable`] A command or group from the cog. """ for command in self.__cog_commands__: @@ -440,7 +441,7 @@ def cog_unload(self) -> None: """ @_cog_special_method - def bot_check_once(self, ctx: ApplicationContext) -> bool: + def bot_check_once(self, ctx: BaseContext) -> bool: """A special method that registers as a :meth:`.Bot.check_once` check. @@ -449,45 +450,45 @@ def bot_check_once(self, ctx: ApplicationContext) -> bool: Parameters ---------- - ctx: :class:`.Context` + ctx: :class:`.BaseContext` The invocation context. """ return True @_cog_special_method - def bot_check(self, ctx: ApplicationContext) -> bool: + def bot_check(self, ctx: BaseContext) -> bool: """A special method that registers as a :meth:`.Bot.check` check. This function **can** be a coroutine and must take a sole parameter, - ``ctx``, to represent the :class:`.Context` or :class:`.ApplicationContext`. + ``ctx``, to represent a subclass of :class:`BaseContext` (either :class:`.Context` + or :class:`.ApplicationContext`). Parameters ---------- - ctx: :class:`.Context` + ctx: :class:`.BaseContext` The invocation context. """ return True @_cog_special_method - def cog_check(self, ctx: ApplicationContext) -> bool: + def cog_check(self, ctx: BaseContext) -> bool: """A special method that registers as a :func:`~discord.ext.commands.check` for every command and subcommand in this cog. This function **can** be a coroutine and must take a sole parameter, - ``ctx``, to represent the :class:`.Context` or :class:`.ApplicationContext`. + ``ctx``, to represent a subclass of :class:`BaseContext` (either :class:`.Context` + or :class:`.ApplicationContext`). Parameters ---------- - ctx: :class:`.Context` + ctx: :class:`.BaseContext` The invocation context. """ return True @_cog_special_method - async def cog_command_error( - self, ctx: ApplicationContext, error: Exception - ) -> None: + async def cog_command_error(self, ctx: BaseContext, error: Exception) -> None: """A special method that is called whenever an error is dispatched inside this cog. @@ -498,37 +499,37 @@ async def cog_command_error( Parameters ---------- - ctx: :class:`.ApplicationContext` + ctx: :class:`.BaseContext` The invocation context where the error happened. error: :class:`ApplicationCommandError` The error that happened. """ @_cog_special_method - async def cog_before_invoke(self, ctx: ApplicationContext) -> None: + async def cog_before_invoke(self, ctx: BaseContext) -> None: """A special method that acts as a cog local pre-invoke hook. - This is similar to :meth:`.ApplicationCommand.before_invoke`. + This is similar to :meth:`.Invokable.before_invoke`. This **must** be a coroutine. Parameters ---------- - ctx: :class:`.ApplicationContext` + ctx: :class:`.BaseContext` The invocation context. """ @_cog_special_method - async def cog_after_invoke(self, ctx: ApplicationContext) -> None: + async def cog_after_invoke(self, ctx: BaseContext) -> None: """A special method that acts as a cog local post-invoke hook. - This is similar to :meth:`.ApplicationCommand.after_invoke`. + This is similar to :meth:`.BaseContext.after_invoke`. This **must** be a coroutine. Parameters ---------- - ctx: :class:`.ApplicationContext` + ctx: :class:`.BaseContext` The invocation context. """ diff --git a/discord/commands/__init__.py b/discord/commands/__init__.py index 1813faf3dc..259500cf34 100644 --- a/discord/commands/__init__.py +++ b/discord/commands/__init__.py @@ -25,5 +25,6 @@ from .context import * from .core import * +from .mixins import * from .options import * from .permissions import * diff --git a/discord/commands/context.py b/discord/commands/context.py index 576a804494..9968fe26fb 100644 --- a/discord/commands/context.py +++ b/discord/commands/context.py @@ -30,20 +30,17 @@ from discord.interactions import Interaction, InteractionMessage, InteractionResponse from discord.webhook.async_ import Webhook +from .mixins import BaseContext + if TYPE_CHECKING: from typing_extensions import ParamSpec import discord from .. import Bot - from ..state import ConnectionState - from ..voice_client import VoiceProtocol from .core import ApplicationCommand, Option from ..interactions import InteractionChannel - from ..guild import Guild - from ..member import Member from ..message import Message - from ..user import User from ..permissions import Permissions from ..client import ClientUser @@ -65,7 +62,7 @@ __all__ = ("ApplicationContext", "AutocompleteContext") -class ApplicationContext(discord.abc.Messageable): +class ApplicationContext(BaseContext): """Represents a Discord application command interaction context. This class is not created manually and is instead passed to application @@ -83,85 +80,74 @@ class ApplicationContext(discord.abc.Messageable): The command that this context belongs to. """ - def __init__(self, bot: Bot, interaction: Interaction): - self.bot = bot + command: ApplicationCommand | None + + def __init__( + self, + bot: Bot, + interaction: Interaction, + *, + command: ApplicationCommand | None = None, + args: list[Any] = None, + kwargs: dict[str, Any] = None, + **kwargs2, + ): + super().__init__(bot=bot, command=command, args=args, kwargs=kwargs, **kwargs2) + self.interaction = interaction # below attributes will be set after initialization - self.command: ApplicationCommand = None # type: ignore self.focused: Option = None # type: ignore self.value: str = None # type: ignore self.options: dict = None # type: ignore - self._state: ConnectionState = self.interaction._state - - async def _get_channel(self) -> InteractionChannel | None: - return self.interaction.channel + async def reinvoke(self, *, call_hooks: bool = False, restart: bool = True) -> None: + """|coro| - async def invoke( - self, - command: ApplicationCommand[CogT, P, T], - /, - *args: P.args, - **kwargs: P.kwargs, - ) -> T: - r"""|coro| + Calls the command again. - Calls a command with the arguments given. - This is useful if you want to just call the callback that a - :class:`.ApplicationCommand` holds internally. + This is similar to :meth:`~.BaseContext.invoke` except that it bypasses + checks, cooldowns, and error handlers. .. note:: - This does not handle converters, checks, cooldowns, pre-invoke, - or after-invoke hooks in any matter. It calls the internal callback - directly as-if it was a regular function. - You must take care in passing the proper arguments when - using this function. + If you want to bypass :exc:`.UserInputError` derived exceptions, + it is recommended to use the regular :meth:`~.Context.invoke` + as it will work more naturally. After all, this will end up + using the old arguments the user has used and will thus just + fail again. Parameters - ----------- - command: :class:`.ApplicationCommand` - The command that is going to be called. - \*args - The arguments to use. - \*\*kwargs - The keyword arguments to use. + ---------- + call_hooks: :class:`bool` + Whether to call the before and after invoke hooks. + restart: :class:`bool` + Whether to start the call chain from the very beginning + or where we left off (i.e. the command that caused the error). + The default is to start where we left off. Raises - ------- - TypeError - The command argument to invoke is missing. - """ - return await command(self, *args, **kwargs) - - @cached_property - def channel(self) -> InteractionChannel | None: - """Union[:class:`abc.GuildChannel`, :class:`PartialMessageable`, :class:`Thread`]: - Returns the channel associated with this context's command. Shorthand for :attr:`.Interaction.channel`. + ------ + ValueError + The context to reinvoke is not valid. """ - return self.interaction.channel + cmd = self.command + if cmd is None: + raise ValueError("This context is not valid.") - @cached_property - def channel_id(self) -> int | None: - """Returns the ID of the channel associated with this context's command. - Shorthand for :attr:`.Interaction.channel_id`. - """ - return self.interaction.channel_id + if restart: + to_call = cmd.root_parent or cmd + else: + to_call = cmd - @cached_property - def guild(self) -> Guild | None: - """Returns the guild associated with this context's command. - Shorthand for :attr:`.Interaction.guild`. - """ - return self.interaction.guild + try: + await to_call.reinvoke(self, call_hooks=call_hooks) + finally: + self.command = cmd - @cached_property - def guild_id(self) -> int | None: - """Returns the ID of the guild associated with this context's command. - Shorthand for :attr:`.Interaction.guild_id`. - """ - return self.interaction.guild_id + @property + def source(self) -> Interaction: + return self.interaction @cached_property def locale(self) -> str | None: @@ -181,18 +167,6 @@ def guild_locale(self) -> str | None: def app_permissions(self) -> Permissions: return self.interaction.app_permissions - @cached_property - def me(self) -> Member | ClientUser | None: - """Union[:class:`.Member`, :class:`.ClientUser`]: - Similar to :attr:`.Guild.me` except it may return the :class:`.ClientUser` in private message - message contexts, or when :meth:`Intents.guilds` is absent. - """ - return ( - self.interaction.guild.me - if self.interaction.guild is not None - else self.bot.user - ) - @cached_property def message(self) -> Message | None: """Returns the message sent with this context's command. @@ -200,25 +174,6 @@ def message(self) -> Message | None: """ return self.interaction.message - @cached_property - def user(self) -> Member | User: - """Returns the user that sent this context's command. - Shorthand for :attr:`.Interaction.user`. - """ - return self.interaction.user # type: ignore # command user will never be None - - author: Member | User = user - - @property - def voice_client(self) -> VoiceProtocol | None: - """Returns the voice client associated with this context's command. - Shorthand for :attr:`Interaction.guild.voice_client<~discord.Guild.voice_client>`, if applicable. - """ - if self.interaction.guild is None: - return None - - return self.interaction.guild.voice_client - @cached_property def response(self) -> InteractionResponse: """Returns the response object associated with this context's command. @@ -334,19 +289,9 @@ async def delete(self, *, delay: float | None = None) -> None: def edit(self) -> Callable[..., Awaitable[InteractionMessage]]: return self.interaction.edit_original_response - @property - def cog(self) -> Cog | None: - """Returns the cog associated with this context's command. - ``None`` if it does not exist. - """ - if self.command is None: - return None - - return self.command.cog - class AutocompleteContext: - """Represents context for a slash command's option autocomplete. + """Represents context for a slash command's option autocomplete. This ***does not*** inherent from :class:`.BaseContext`. This class is not created manually and is instead passed to an :class:`.Option`'s autocomplete callback. diff --git a/discord/commands/cooldowns.py b/discord/commands/cooldowns.py new file mode 100644 index 0000000000..761b4274fb --- /dev/null +++ b/discord/commands/cooldowns.py @@ -0,0 +1,399 @@ +""" +The MIT License (MIT) + +Copyright (c) 2015-2021 Rapptz +Copyright (c) 2021-present Pycord Development + +Permission is hereby granted, free of charge, to any person obtaining a +copy of this software and associated documentation files (the "Software"), +to deal in the Software without restriction, including without limitation +the rights to use, copy, modify, merge, publish, distribute, sublicense, +and/or sell copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. +""" + +from __future__ import annotations + +import asyncio +import time +from collections import deque +from typing import TYPE_CHECKING, Any, Callable, Deque, TypeVar + +from discord.enums import Enum + +from ..abc import PrivateChannel +from ..errors import MaxConcurrencyReached + +if TYPE_CHECKING: + from ..message import Message + +__all__ = ( + "BucketType", + "Cooldown", + "CooldownMapping", + "DynamicCooldownMapping", + "MaxConcurrency", +) + +C = TypeVar("C", bound="CooldownMapping") +MC = TypeVar("MC", bound="MaxConcurrency") + + +class BucketType(Enum): + default = 0 + user = 1 + guild = 2 + channel = 3 + member = 4 + category = 5 + role = 6 + + def get_key(self, msg: Message) -> Any: + if self is BucketType.user: + return msg.author.id + elif self is BucketType.guild: + return (msg.guild or msg.author).id + elif self is BucketType.channel: + return msg.channel.id + elif self is BucketType.member: + return (msg.guild and msg.guild.id), msg.author.id + elif self is BucketType.category: + return (msg.channel.category or msg.channel).id # type: ignore + elif self is BucketType.role: + # we return the channel id of a private-channel as there are only roles in guilds + # and that yields the same result as for a guild with only the @everyone role + # NOTE: PrivateChannel doesn't actually have an id attribute, but we assume we are + # receiving a DMChannel or GroupChannel which inherit from PrivateChannel and do + return (msg.channel if isinstance(msg.channel, PrivateChannel) else msg.author.top_role).id # type: ignore + + def __call__(self, msg: Message) -> Any: + return self.get_key(msg) + + +class Cooldown: + """Represents a cooldown for a command. + + Attributes + ---------- + rate: :class:`int` + The total number of tokens available per :attr:`per` seconds. + per: :class:`float` + The length of the cooldown period in seconds. + """ + + __slots__ = ("rate", "per", "_window", "_tokens", "_last") + + def __init__(self, rate: float, per: float) -> None: + self.rate: int = int(rate) + self.per: float = float(per) + self._window: float = 0.0 + self._tokens: int = self.rate + self._last: float = 0.0 + + def get_tokens(self, current: float | None = None) -> int: + """Returns the number of available tokens before rate limiting is applied. + + Parameters + ---------- + current: Optional[:class:`float`] + The time in seconds since Unix epoch to calculate tokens at. + If not supplied then :func:`time.time()` is used. + + Returns + ------- + :class:`int` + The number of tokens available before the cooldown is to be applied. + """ + if not current: + current = time.time() + + tokens = self._tokens + + if current > self._window + self.per: + tokens = self.rate + return tokens + + def get_retry_after(self, current: float | None = None) -> float: + """Returns the time in seconds until the cooldown will be reset. + + Parameters + ---------- + current: Optional[:class:`float`] + The current time in seconds since Unix epoch. + If not supplied, then :func:`time.time()` is used. + + Returns + ------- + :class:`float` + The number of seconds to wait before this cooldown will be reset. + """ + current = current or time.time() + tokens = self.get_tokens(current) + + if tokens == 0: + return self.per - (current - self._window) + + return 0.0 + + def update_rate_limit(self, current: float | None = None) -> float | None: + """Updates the cooldown rate limit. + + Parameters + ---------- + current: Optional[:class:`float`] + The time in seconds since Unix epoch to update the rate limit at. + If not supplied, then :func:`time.time()` is used. + + Returns + ------- + Optional[:class:`float`] + The retry-after time in seconds if rate limited. + """ + current = current or time.time() + self._last = current + + self._tokens = self.get_tokens(current) + + # first token used means that we start a new rate limit window + if self._tokens == self.rate: + self._window = current + + # check if we are rate limited + if self._tokens == 0: + return self.per - (current - self._window) + + # we're not so decrement our tokens + self._tokens -= 1 + + def reset(self) -> None: + """Reset the cooldown to its initial state.""" + self._tokens = self.rate + self._last = 0.0 + + def copy(self) -> Cooldown: + """Creates a copy of this cooldown. + + Returns + ------- + :class:`Cooldown` + A new instance of this cooldown. + """ + return Cooldown(self.rate, self.per) + + def __repr__(self) -> str: + return f"" + + +class CooldownMapping: + def __init__( + self, + original: Cooldown | None, + type: Callable[[Message], Any], + ) -> None: + if not callable(type): + raise TypeError("Cooldown type must be a BucketType or callable") + + self._cache: dict[Any, Cooldown] = {} + self._cooldown: Cooldown | None = original + self._type: Callable[[Message], Any] = type + + def copy(self) -> CooldownMapping: + ret = CooldownMapping(self._cooldown, self._type) + ret._cache = self._cache.copy() + return ret + + @property + def valid(self) -> bool: + return self._cooldown is not None + + @property + def type(self) -> Callable[[Message], Any]: + return self._type + + @classmethod + def from_cooldown(cls: type[C], rate, per, type) -> C: + return cls(Cooldown(rate, per), type) + + def _bucket_key(self, msg: Message) -> Any: + return self._type(msg) + + def _verify_cache_integrity(self, current: float | None = None) -> None: + # we want to delete all cache objects that haven't been used + # in a cooldown window. e.g. if we have a command that has a + # cooldown of 60s, and it has not been used in 60s then that key should be deleted + current = current or time.time() + dead_keys = [k for k, v in self._cache.items() if current > v._last + v.per] + for k in dead_keys: + del self._cache[k] + + def create_bucket(self, message: Message) -> Cooldown: + return self._cooldown.copy() # type: ignore + + def get_bucket(self, message: Message, current: float | None = None) -> Cooldown: + if self._type is BucketType.default: + return self._cooldown # type: ignore + + self._verify_cache_integrity(current) + key = self._bucket_key(message) + if key not in self._cache: + bucket = self.create_bucket(message) + if bucket is not None: + self._cache[key] = bucket + else: + bucket = self._cache[key] + + return bucket + + def update_rate_limit( + self, message: Message, current: float | None = None + ) -> float | None: + bucket = self.get_bucket(message, current) + return bucket.update_rate_limit(current) + + +class DynamicCooldownMapping(CooldownMapping): + def __init__( + self, factory: Callable[[Message], Cooldown], type: Callable[[Message], Any] + ) -> None: + super().__init__(None, type) + self._factory: Callable[[Message], Cooldown] = factory + + def copy(self) -> DynamicCooldownMapping: + ret = DynamicCooldownMapping(self._factory, self._type) + ret._cache = self._cache.copy() + return ret + + @property + def valid(self) -> bool: + return True + + def create_bucket(self, message: Message) -> Cooldown: + return self._factory(message) + + +class _Semaphore: + """This class is a version of a semaphore. + + If you're wondering why asyncio.Semaphore isn't being used, + it's because it doesn't expose the internal value. This internal + value is necessary because I need to support both `wait=True` and + `wait=False`. + + An asyncio.Queue could have been used to do this as well -- but it is + not as inefficient since internally that uses two queues and is a bit + overkill for what is basically a counter. + """ + + __slots__ = ("value", "loop", "_waiters") + + def __init__(self, number: int) -> None: + self.value: int = number + self.loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() + self._waiters: Deque[asyncio.Future] = deque() + + def __repr__(self) -> str: + return f"<_Semaphore value={self.value} waiters={len(self._waiters)}>" + + def locked(self) -> bool: + return self.value == 0 + + def is_active(self) -> bool: + return len(self._waiters) > 0 + + def wake_up(self) -> None: + while self._waiters: + future = self._waiters.popleft() + if not future.done(): + future.set_result(None) + return + + async def acquire(self, *, wait: bool = False) -> bool: + if not wait and self.value <= 0: + # signal that we're not acquiring + return False + + while self.value <= 0: + future = self.loop.create_future() + self._waiters.append(future) + try: + await future + except: + future.cancel() + if self.value > 0 and not future.cancelled(): + self.wake_up() + raise + + self.value -= 1 + return True + + def release(self) -> None: + self.value += 1 + self.wake_up() + + +class MaxConcurrency: + __slots__ = ("number", "per", "wait", "_mapping") + + def __init__(self, number: int, *, per: BucketType, wait: bool) -> None: + self._mapping: dict[Any, _Semaphore] = {} + self.per: BucketType = per + self.number: int = number + self.wait: bool = wait + + if number <= 0: + raise ValueError("max_concurrency 'number' cannot be less than 1") + + if not isinstance(per, BucketType): + raise TypeError( + f"max_concurrency 'per' must be of type BucketType not {type(per)!r}" + ) + + def copy(self: MC) -> MC: + return self.__class__(self.number, per=self.per, wait=self.wait) + + def __repr__(self) -> str: + return ( + f"" + ) + + def get_key(self, message: Message) -> Any: + return self.per.get_key(message) + + async def acquire(self, message: Message) -> None: + key = self.get_key(message) + + try: + sem = self._mapping[key] + except KeyError: + self._mapping[key] = sem = _Semaphore(self.number) + + acquired = await sem.acquire(wait=self.wait) + if not acquired: + raise MaxConcurrencyReached(self.number, self.per) + + async def release(self, message: Message) -> None: + # Technically there's no reason for this function to be async + # But it might be more useful in the future + key = self.get_key(message) + + try: + sem = self._mapping[key] + except KeyError: + # ...? peculiar + return + else: + sem.release() + + if sem.value >= self.number and not sem.is_active(): + del self._mapping[key] diff --git a/discord/commands/core.py b/discord/commands/core.py index aff52c8108..05fc8c5924 100644 --- a/discord/commands/core.py +++ b/discord/commands/core.py @@ -26,43 +26,29 @@ from __future__ import annotations import asyncio -import datetime -import functools import inspect import re import sys import types from collections import OrderedDict from enum import Enum -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Coroutine, - Generator, - Generic, - TypeVar, - Union, -) +from typing import TYPE_CHECKING, Any, Callable, Generator, Generic, TypeVar, Union + +from discord import utils from ..channel import _threaded_guild_channel_factory from ..enums import Enum as DiscordEnum from ..enums import MessageType, SlashCommandOptionType, try_enum -from ..errors import ( - ApplicationCommandError, - ApplicationCommandInvokeError, - CheckFailure, - ClientException, - ValidationError, -) +from ..errors import CheckFailure, ClientException, DisabledCommand, ValidationError from ..member import Member from ..message import Attachment, Message from ..object import Object from ..role import Role from ..threads import Thread from ..user import User -from ..utils import MISSING, async_all, find, maybe_coroutine, utcnow +from ..utils import MISSING, find from .context import ApplicationContext, AutocompleteContext +from .mixins import CogT, Invokable, _BaseCommand from .options import Option, OptionChoice if sys.version_info >= (3, 11): @@ -86,78 +72,17 @@ ) if TYPE_CHECKING: - from typing_extensions import Concatenate, ParamSpec + from typing_extensions import ParamSpec from .. import Permissions - from ..cog import Cog - from ..ext.commands.cooldowns import CooldownMapping, MaxConcurrency - -T = TypeVar("T") -CogT = TypeVar("CogT", bound="Cog") -Coro = TypeVar("Coro", bound=Callable[..., Coroutine[Any, Any, Any]]) + from .cooldowns import CooldownMapping, MaxConcurrency + from .mixins import BaseContext -if TYPE_CHECKING: P = ParamSpec("P") else: P = TypeVar("P") - -def wrap_callback(coro): - from ..ext.commands.errors import CommandError - - @functools.wraps(coro) - async def wrapped(*args, **kwargs): - try: - ret = await coro(*args, **kwargs) - except ApplicationCommandError: - raise - except CommandError: - raise - except asyncio.CancelledError: - return - except Exception as exc: - raise ApplicationCommandInvokeError(exc) from exc - return ret - - return wrapped - - -def hooked_wrapped_callback(command, ctx, coro): - from ..ext.commands.errors import CommandError - - @functools.wraps(coro) - async def wrapped(arg): - try: - ret = await coro(arg) - except ApplicationCommandError: - raise - except CommandError: - raise - except asyncio.CancelledError: - return - except Exception as exc: - raise ApplicationCommandInvokeError(exc) from exc - finally: - if ( - hasattr(command, "_max_concurrency") - and command._max_concurrency is not None - ): - await command._max_concurrency.release(ctx) - await command.call_after_hooks(ctx) - return ret - - return wrapped - - -def unwrap_function(function: Callable[..., Any]) -> Callable[..., Any]: - partial = functools.partial - while True: - if hasattr(function, "__wrapped__"): - function = function.__wrapped__ - elif isinstance(function, partial): - function = function.func - else: - return function +T = TypeVar("T") def _validate_names(obj): @@ -174,51 +99,25 @@ def _validate_descriptions(obj): validate_chat_input_description(string, locale=locale) -class _BaseCommand: - __slots__ = () +class ApplicationCommand(Invokable, _BaseCommand, Generic[CogT, P, T]): + """Base class for all Application Commands, including: + - :class:`SlashCommand` + - :class:`SlashCommandGroup` + - :class:`ContextMenuCommand` which in turn is a superclass of + - :class:`MessageCommand` and + - :class:`UserCommand` + + This is a subclass of :class:`.Invokable`. + """ -class ApplicationCommand(_BaseCommand, Generic[CogT, P, T]): - __original_kwargs__: dict[str, Any] cog = None + parent: ApplicationCommand def __init__(self, func: Callable, **kwargs) -> None: - from ..ext.commands.cooldowns import BucketType, CooldownMapping, MaxConcurrency - - cooldown = getattr(func, "__commands_cooldown__", kwargs.get("cooldown")) - - if cooldown is None: - buckets = CooldownMapping(cooldown, BucketType.default) - elif isinstance(cooldown, CooldownMapping): - buckets = cooldown - else: - raise TypeError( - "Cooldown must be a an instance of CooldownMapping or None." - ) - - self._buckets: CooldownMapping = buckets - - max_concurrency = getattr( - func, "__commands_max_concurrency__", kwargs.get("max_concurrency") - ) - - self._max_concurrency: MaxConcurrency | None = max_concurrency - - self._callback = None - self.module = None - - self.name: str = kwargs.get("name", func.__name__) - - try: - checks = func.__commands_checks__ - checks.reverse() - except AttributeError: - checks = kwargs.get("checks", []) - - self.checks = checks + super().__init__(func, **kwargs) self.id: int | None = kwargs.get("id") self.guild_ids: list[int] | None = kwargs.get("guild_ids", None) - self.parent = kwargs.get("parent") # Permissions self.default_member_permissions: Permissions | None = getattr( @@ -246,347 +145,72 @@ def __eq__(self, other) -> bool: isinstance(other, self.__class__) and self.parent == other.parent and check ) - async def __call__(self, ctx, *args, **kwargs): - """|coro| - Calls the command's callback. - - This method bypasses all checks that a command has and does not - convert the arguments beforehand, so take care to pass the correct - arguments in. - """ - if self.cog is not None: - return await self.callback(self.cog, ctx, *args, **kwargs) - return await self.callback(ctx, *args, **kwargs) - - @property - def callback( - self, - ) -> ( - Callable[Concatenate[CogT, ApplicationContext, P], Coro[T]] - | Callable[Concatenate[ApplicationContext, P], Coro[T]] - ): - return self._callback - - @callback.setter - def callback( - self, - function: ( - Callable[Concatenate[CogT, ApplicationContext, P], Coro[T]] - | Callable[Concatenate[ApplicationContext, P], Coro[T]] - ), - ) -> None: - self._callback = function - unwrap = unwrap_function(function) - self.module = unwrap.__module__ - - def _prepare_cooldowns(self, ctx: ApplicationContext): - if self._buckets.valid: - current = datetime.datetime.now().timestamp() - bucket = self._buckets.get_bucket(ctx, current) # type: ignore # ctx instead of non-existent message - - if bucket is not None: - retry_after = bucket.update_rate_limit(current) - - if retry_after: - from ..ext.commands.errors import CommandOnCooldown - - raise CommandOnCooldown(bucket, retry_after, self._buckets.type) # type: ignore - - async def prepare(self, ctx: ApplicationContext) -> None: - # This should be same across all 3 types - ctx.command = self - - if not await self.can_run(ctx): - raise CheckFailure( - f"The check functions for the command {self.name} failed" - ) - - if self._max_concurrency is not None: - # For this application, context can be duck-typed as a Message - await self._max_concurrency.acquire(ctx) # type: ignore # ctx instead of non-existent message + def _get_signature_parameters(self): + return OrderedDict(inspect.signature(self.callback).parameters) - try: - self._prepare_cooldowns(ctx) - await self.call_before_hooks(ctx) - except: - if self._max_concurrency is not None: - await self._max_concurrency.release(ctx) # type: ignore # ctx instead of non-existent message - raise + async def _dispatch_error(self, ctx: BaseContext, error: Exception) -> None: + ctx.bot.dispatch("application_command_error", ctx, error) - def is_on_cooldown(self, ctx: ApplicationContext) -> bool: - """Checks whether the command is currently on cooldown. + async def can_run(self, ctx: ApplicationContext) -> bool: + """|coro| - .. note:: + Checks if the command can be executed by checking all the predicates + inside the :attr:`~ApplicationCommand.checks` attribute. This also checks whether the + command is disabled. - This uses the current time instead of the interaction time. + .. versionchanged:: 1.3 + Checks whether the command is disabled or not Parameters ---------- ctx: :class:`.ApplicationContext` - The invocation context to use when checking the command's cooldown status. + The ctx of the command currently being invoked. Returns ------- :class:`bool` - A boolean indicating if the command is on cooldown. - """ - if not self._buckets.valid: - return False - - bucket = self._buckets.get_bucket(ctx) - current = utcnow().timestamp() - return bucket.get_tokens(current) == 0 - - def reset_cooldown(self, ctx: ApplicationContext) -> None: - """Resets the cooldown on this command. - - Parameters - ---------- - ctx: :class:`.ApplicationContext` - The invocation context to reset the cooldown under. - """ - if self._buckets.valid: - bucket = self._buckets.get_bucket(ctx) # type: ignore # ctx instead of non-existent message - bucket.reset() - - def get_cooldown_retry_after(self, ctx: ApplicationContext) -> float: - """Retrieves the amount of seconds before this command can be tried again. - - .. note:: - - This uses the current time instead of the interaction time. - - Parameters - ---------- - ctx: :class:`.ApplicationContext` - The invocation context to retrieve the cooldown from. + A boolean indicating if the command can be invoked. - Returns - ------- - :class:`float` - The amount of time left on this command's cooldown in seconds. - If this is ``0.0`` then the command isn't on cooldown. + Raises + ------ + :class:`CommandError` + Any command error that was raised during a check call will be propagated + by this function. """ - if self._buckets.valid: - bucket = self._buckets.get_bucket(ctx) - current = utcnow().timestamp() - return bucket.get_retry_after(current) - return 0.0 + if not self.enabled: + raise DisabledCommand(f"{self.name} command is disabled") - async def invoke(self, ctx: ApplicationContext) -> None: - await self.prepare(ctx) - - injected = hooked_wrapped_callback(self, ctx, self._invoke) - await injected(ctx) + original = ctx.command + ctx.command = self - async def can_run(self, ctx: ApplicationContext) -> bool: - if not await ctx.bot.can_run(ctx): - raise CheckFailure( - f"The global check functions for command {self.name} failed." - ) + try: + if not await ctx.bot.can_run(ctx): + raise CheckFailure( + f"The global check functions for command {self.qualified_name} failed." + ) - predicates = self.checks - if self.parent is not None: - # parent checks should be run first - predicates = self.parent.checks + predicates + # since slash command parents don't really use checks, we can make it + # a feature to have "global" checks for slash commands only + predicates = self.checks + if self.parent is not None: + predicates = self.parent.checks + predicates - cog = self.cog - if cog is not None: - local_check = cog._get_overridden_method(cog.cog_check) - if local_check is not None: - ret = await maybe_coroutine(local_check, ctx) + if (cog := self.cog) and ( + local_check := cog._get_overridden_method(cog.cog_check) + ): + ret = await utils.maybe_coroutine(local_check, ctx) if not ret: return False - if not predicates: - # since we have no checks, then we just return True. - return True - - return await async_all(predicate(ctx) for predicate in predicates) # type: ignore + predicates = self.checks + if not predicates: + # since we have no checks, then we just return True. + return True - async def dispatch_error(self, ctx: ApplicationContext, error: Exception) -> None: - ctx.command_failed = True - cog = self.cog - try: - coro = self.on_error - except AttributeError: - pass - else: - injected = wrap_callback(coro) - if cog is not None: - await injected(cog, ctx, error) - else: - await injected(ctx, error) - - try: - if cog is not None: - local = cog.__class__._get_overridden_method(cog.cog_command_error) - if local is not None: - wrapped = wrap_callback(local) - await wrapped(ctx, error) + return await utils.async_all(predicate(ctx) for predicate in predicates) finally: - ctx.bot.dispatch("application_command_error", ctx, error) - - def _get_signature_parameters(self): - return OrderedDict(inspect.signature(self.callback).parameters) - - def error(self, coro): - """A decorator that registers a coroutine as a local error handler. - - A local error handler is an :func:`.on_command_error` event limited to - a single command. However, the :func:`.on_command_error` is still - invoked afterwards as the catch-all. - - Parameters - ---------- - coro: :ref:`coroutine ` - The coroutine to register as the local error handler. - - Raises - ------ - TypeError - The coroutine passed is not actually a coroutine. - """ - - if not asyncio.iscoroutinefunction(coro): - raise TypeError("The error handler must be a coroutine.") - - self.on_error = coro - return coro - - def has_error_handler(self) -> bool: - """Checks whether the command has an error handler registered.""" - return hasattr(self, "on_error") - - def before_invoke(self, coro): - """A decorator that registers a coroutine as a pre-invoke hook. - A pre-invoke hook is called directly before the command is - called. This makes it a useful function to set up database - connections or any type of set up required. - - This pre-invoke hook takes a sole parameter, a :class:`.ApplicationContext`. - See :meth:`.Bot.before_invoke` for more info. - - Parameters - ---------- - coro: :ref:`coroutine ` - The coroutine to register as the pre-invoke hook. - - Raises - ------ - TypeError - The coroutine passed is not actually a coroutine. - """ - if not asyncio.iscoroutinefunction(coro): - raise TypeError("The pre-invoke hook must be a coroutine.") - - self._before_invoke = coro - return coro - - def after_invoke(self, coro): - """A decorator that registers a coroutine as a post-invoke hook. - A post-invoke hook is called directly after the command is - called. This makes it a useful function to clean-up database - connections or any type of clean up required. - - This post-invoke hook takes a sole parameter, a :class:`.ApplicationContext`. - See :meth:`.Bot.after_invoke` for more info. - - Parameters - ---------- - coro: :ref:`coroutine ` - The coroutine to register as the post-invoke hook. - - Raises - ------ - TypeError - The coroutine passed is not actually a coroutine. - """ - if not asyncio.iscoroutinefunction(coro): - raise TypeError("The post-invoke hook must be a coroutine.") - - self._after_invoke = coro - return coro - - async def call_before_hooks(self, ctx: ApplicationContext) -> None: - # now that we're done preparing we can call the pre-command hooks - # first, call the command local hook: - cog = self.cog - if self._before_invoke is not None: - # should be cog if @commands.before_invoke is used - instance = getattr(self._before_invoke, "__self__", cog) - # __self__ only exists for methods, not functions - # however, if @command.before_invoke is used, it will be a function - if instance: - await self._before_invoke(instance, ctx) # type: ignore - else: - await self._before_invoke(ctx) # type: ignore - - # call the cog local hook if applicable: - if cog is not None: - hook = cog.__class__._get_overridden_method(cog.cog_before_invoke) - if hook is not None: - await hook(ctx) - - # call the bot global hook if necessary - hook = ctx.bot._before_invoke - if hook is not None: - await hook(ctx) - - async def call_after_hooks(self, ctx: ApplicationContext) -> None: - cog = self.cog - if self._after_invoke is not None: - instance = getattr(self._after_invoke, "__self__", cog) - if instance: - await self._after_invoke(instance, ctx) # type: ignore - else: - await self._after_invoke(ctx) # type: ignore - - # call the cog local hook if applicable: - if cog is not None: - hook = cog.__class__._get_overridden_method(cog.cog_after_invoke) - if hook is not None: - await hook(ctx) - - hook = ctx.bot._after_invoke - if hook is not None: - await hook(ctx) - - @property - def cooldown(self): - return self._buckets._cooldown - - @property - def full_parent_name(self) -> str: - """Retrieves the fully qualified parent command name. - - This the base command name required to execute it. For example, - in ``/one two three`` the parent name would be ``one two``. - """ - entries = [] - command = self - while command.parent is not None and hasattr(command.parent, "name"): - command = command.parent - entries.append(command.name) - - return " ".join(reversed(entries)) - - @property - def qualified_name(self) -> str: - """Retrieves the fully qualified command name. - - This is the full parent name with the command name as well. - For example, in ``/one two three`` the qualified name would be - ``one two three``. - """ - - parent = self.full_parent_name - - if parent: - return f"{parent} {self.name}" - else: - return self.name + ctx.command = original @property def qualified_id(self) -> int: @@ -602,36 +226,25 @@ def qualified_id(self) -> int: def to_dict(self) -> dict[str, Any]: raise NotImplementedError - def __str__(self) -> str: - return self.qualified_name - - def _set_cog(self, cog): - self.cog = cog - class SlashCommand(ApplicationCommand): - r"""A class that implements the protocol for a slash command. + """A class that implements the protocol for a slash command. These are not created manually, instead they are created via the decorator or functional interface. + This is a subclass of :class:`.Invokable`. + .. versionadded:: 2.0 Attributes - ----------- - name: :class:`str` - The name of the command. - callback: :ref:`coroutine ` - The coroutine that is executed when the command is called. - description: Optional[:class:`str`] - The description for the command. + ---------- guild_ids: Optional[List[:class:`int`]] The ids of the guilds where this command will be registered. options: List[:class:`Option`] The parameters for this command. parent: Optional[:class:`SlashCommandGroup`] - The parent group that this command belongs to. ``None`` if there - isn't one. + The parent group that this command belongs to. mention: :class:`str` Returns a string that allows you to mention the slash command. guild_only: :class:`bool` @@ -641,18 +254,6 @@ class SlashCommand(ApplicationCommand): Apps intending to be listed in the App Directory cannot have NSFW commands. default_member_permissions: :class:`~discord.Permissions` The default permissions a member needs to be able to run the command. - cog: Optional[:class:`Cog`] - The cog that this command belongs to. ``None`` if there isn't one. - checks: List[Callable[[:class:`.ApplicationContext`], :class:`bool`]] - A list of predicates that verifies if the command could be executed - with the given :class:`.ApplicationContext` as the sole parameter. If an exception - is necessary to be thrown to signal failure, then one inherited from - :exc:`.ApplicationCommandError` should be used. Note that if the checks fail then - :exc:`.CheckFailure` exception is raised to the :func:`.on_application_command_error` - event. - cooldown: Optional[:class:`~discord.ext.commands.Cooldown`] - The cooldown applied when the command is invoked. ``None`` if the command - doesn't have a cooldown. name_localizations: Dict[:class:`str`, :class:`str`] The name localizations for this command. The values of this should be ``"locale": "name"``. See `here `_ for a list of valid locales. @@ -660,6 +261,7 @@ class SlashCommand(ApplicationCommand): The description localizations for this command. The values of this should be ``"locale": "description"``. See `here `_ for a list of valid locales. """ + type = 1 def __new__(cls, *args, **kwargs) -> SlashCommand: @@ -670,9 +272,6 @@ def __new__(cls, *args, **kwargs) -> SlashCommand: def __init__(self, func: Callable, *args, **kwargs) -> None: super().__init__(func, **kwargs) - if not asyncio.iscoroutinefunction(func): - raise TypeError("Callback must be a coroutine.") - self.callback = func self.name_localizations: dict[str, str] = kwargs.get( "name_localizations", MISSING @@ -695,17 +294,6 @@ def __init__(self, func: Callable, *args, **kwargs) -> None: self.options: list[Option] = kwargs.get("options", []) - try: - checks = func.__commands_checks__ - checks.reverse() - except AttributeError: - checks = kwargs.get("checks", []) - - self.checks = checks - - self._before_invoke = None - self._after_invoke = None - def _validate_parameters(self): params = self._get_signature_parameters() if kwop := self.options: @@ -793,7 +381,7 @@ def _parse_options(self, params, *, check_params: bool = True) -> list[Option]: return final_options - def _match_option_param_names(self, params, options): + def _match_option_param_names(self, params, options: list[Option]): params = self._check_required_params(params) check_annotations: list[Callable[[Option, type], bool]] = [ @@ -850,8 +438,11 @@ def cog(self): @cog.setter def cog(self, val): - self._cog = val - self._validate_parameters() + if not hasattr(self, "_cog"): + self._cog = MISSING + else: + self._cog = val + self._validate_parameters() @property def is_subcommand(self) -> bool: @@ -859,6 +450,7 @@ def is_subcommand(self) -> bool: @property def mention(self) -> str: + """:class:`str`: Returns a string that allows you to mention the slash command.""" return f"" def to_dict(self) -> dict: @@ -887,7 +479,9 @@ def to_dict(self) -> dict: return as_dict - async def _invoke(self, ctx: ApplicationContext) -> None: + async def _parse_arguments(self, ctx: ApplicationContext) -> None: + ctx.args = [ctx] if self.cog is None else [self.cog, ctx] + # TODO: Parse the args better kwargs = {} for arg in ctx.interaction.data.get("options", []): @@ -998,12 +592,7 @@ async def _invoke(self, ctx: ApplicationContext) -> None: if o._parameter_name not in kwargs: kwargs[o._parameter_name] = o.default - if self.cog is not None: - await self.callback(self.cog, ctx, **kwargs) - elif self.parent is not None and self.attached_to_group is True: - await self.callback(self.parent, ctx, **kwargs) - else: - await self.callback(ctx, **kwargs) + ctx.kwargs = kwargs async def invoke_autocomplete_callback(self, ctx: AutocompleteContext): values = {i.name: i.default for i in self.options} @@ -1036,54 +625,20 @@ async def invoke_autocomplete_callback(self, ctx: AutocompleteContext): choices=choices ) - def copy(self): - """Creates a copy of this command. - - Returns - ------- - :class:`SlashCommand` - A new instance of this command. - """ - ret = self.__class__(self.callback, **self.__original_kwargs__) - return self._ensure_assignment_on_copy(ret) - - def _ensure_assignment_on_copy(self, other): - other._before_invoke = self._before_invoke - other._after_invoke = self._after_invoke - if self.checks != other.checks: - other.checks = self.checks.copy() - # if self._buckets.valid and not other._buckets.valid: - # other._buckets = self._buckets.copy() - # if self._max_concurrency != other._max_concurrency: - # # _max_concurrency won't be None at this point - # other._max_concurrency = self._max_concurrency.copy() # type: ignore - - try: - other.on_error = self.on_error - except AttributeError: - pass - return other - - def _update_copy(self, kwargs: dict[str, Any]): - if kwargs: - kw = kwargs.copy() - kw.update(self.__original_kwargs__) - copy = self.__class__(self.callback, **kw) - return self._ensure_assignment_on_copy(copy) - else: - return self.copy() - +# TODO: implement with GroupMixin maybe class SlashCommandGroup(ApplicationCommand): - r"""A class that implements the protocol for a slash command group. + """A class that implements the protocol for a slash command group. These can be created manually, but they should be created via the decorator or functional interface. + This is a subclass of :class:`.Invokable`. + + .. versionadded:: 2.0 + Attributes - ----------- - name: :class:`str` - The name of the command. + ---------- description: Optional[:class:`str`] The description for the command. guild_ids: Optional[List[:class:`int`]] @@ -1098,13 +653,6 @@ class SlashCommandGroup(ApplicationCommand): Apps intending to be listed in the App Directory cannot have NSFW commands. default_member_permissions: :class:`~discord.Permissions` The default permissions a member needs to be able to run the command. - checks: List[Callable[[:class:`.ApplicationContext`], :class:`bool`]] - A list of predicates that verifies if the command could be executed - with the given :class:`.ApplicationContext` as the sole parameter. If an exception - is necessary to be thrown to signal failure, then one inherited from - :exc:`.ApplicationCommandError` should be used. Note that if the checks fail then - :exc:`.CheckFailure` exception is raised to the :func:`.on_application_command_error` - event. name_localizations: Dict[:class:`str`, :class:`str`] The name localizations for this command. The values of this should be ``"locale": "name"``. See `here `_ for a list of valid locales. @@ -1112,6 +660,7 @@ class SlashCommandGroup(ApplicationCommand): The description localizations for this command. The values of this should be ``"locale": "description"``. See `here `_ for a list of valid locales. """ + __initial_commands__: list[SlashCommand | SlashCommandGroup] type = 1 @@ -1300,8 +849,9 @@ def create_subgroup( """ if self.parent is not None: - # TODO: Improve this error message - raise Exception("a subgroup cannot have a subgroup") + raise Exception( + "A command subgroup can only have commands and not any more groups." + ) sub_command_group = SlashCommandGroup( name, description, guild_ids, parent=self, **kwargs @@ -1353,12 +903,13 @@ def inner(cls: type[SlashCommandGroup]) -> SlashCommandGroup: return inner - async def _invoke(self, ctx: ApplicationContext) -> None: + async def invoke(self, ctx: ApplicationContext) -> None: option = ctx.interaction.data["options"][0] resolved = ctx.interaction.data.get("resolved", None) command = find(lambda x: x.name == option["name"], self.subcommands) option["resolved"] = resolved ctx.interaction.data = option + ctx.invoked_subcommand = command await command.invoke(ctx) async def invoke_autocomplete_callback(self, ctx: AutocompleteContext) -> None: @@ -1402,48 +953,6 @@ def walk_commands(self) -> Generator[SlashCommand | SlashCommandGroup, None, Non yield from command.walk_commands() yield command - def copy(self): - """Creates a copy of this command group. - - Returns - ------- - :class:`SlashCommandGroup` - A new instance of this command group. - """ - ret = self.__class__( - name=self.name, - description=self.description, - **{ - param: value - for param, value in self.__original_kwargs__.items() - if param not in ("name", "description") - }, - ) - return self._ensure_assignment_on_copy(ret) - - def _ensure_assignment_on_copy(self, other): - other.parent = self.parent - - other._before_invoke = self._before_invoke - other._after_invoke = self._after_invoke - - if self.subcommands != other.subcommands: - other.subcommands = self.subcommands.copy() - - if self.checks != other.checks: - other.checks = self.checks.copy() - - return other - - def _update_copy(self, kwargs: dict[str, Any]): - if kwargs: - kw = kwargs.copy() - kw.update(self.__original_kwargs__) - copy = self.__class__(self.callback, **kw) - return self._ensure_assignment_on_copy(copy) - else: - return self.copy() - def _set_cog(self, cog): super()._set_cog(cog) for subcommand in self.subcommands: @@ -1451,19 +960,16 @@ def _set_cog(self, cog): class ContextMenuCommand(ApplicationCommand): - r"""A class that implements the protocol for context menu commands. + """A base class that implements the protocol for context menu commands. - These are not created manually, instead they are created via the - decorator or functional interface. + These are not meant to be directly used, same as :class:`ApplicationCommand`. + + This is a subclass of :class:`.Invokable` but does not support the ``parent`` attribute. .. versionadded:: 2.0 Attributes - ----------- - name: :class:`str` - The name of the command. - callback: :ref:`coroutine ` - The coroutine that is executed when the command is called. + ---------- guild_ids: Optional[List[:class:`int`]] The ids of the guilds where this command will be registered. guild_only: :class:`bool` @@ -1473,18 +979,6 @@ class ContextMenuCommand(ApplicationCommand): Apps intending to be listed in the App Directory cannot have NSFW commands. default_member_permissions: :class:`~discord.Permissions` The default permissions a member needs to be able to run the command. - cog: Optional[:class:`Cog`] - The cog that this command belongs to. ``None`` if there isn't one. - checks: List[Callable[[:class:`.ApplicationContext`], :class:`bool`]] - A list of predicates that verifies if the command could be executed - with the given :class:`.ApplicationContext` as the sole parameter. If an exception - is necessary to be thrown to signal failure, then one inherited from - :exc:`.ApplicationCommandError` should be used. Note that if the checks fail then - :exc:`.CheckFailure` exception is raised to the :func:`.on_application_command_error` - event. - cooldown: Optional[:class:`~discord.ext.commands.Cooldown`] - The cooldown applied when the command is invoked. ``None`` if the command - doesn't have a cooldown. name_localizations: Dict[:class:`str`, :class:`str`] The name localizations for this command. The values of this should be ``"locale": "name"``. See `here `_ for a list of valid locales. @@ -1498,9 +992,6 @@ def __new__(cls, *args, **kwargs) -> ContextMenuCommand: def __init__(self, func: Callable, *args, **kwargs) -> None: super().__init__(func, **kwargs) - if not asyncio.iscoroutinefunction(func): - raise TypeError("Callback must be a coroutine.") - self.callback = func self.name_localizations: dict[str, str] = kwargs.get( "name_localizations", MISSING @@ -1508,15 +999,10 @@ def __init__(self, func: Callable, *args, **kwargs) -> None: # Discord API doesn't support setting descriptions for context menu commands, so it must be empty self.description = "" - if not isinstance(self.name, str): - raise TypeError("Name of a command must be a string.") self.cog = None self.id = None - self._before_invoke = None - self._after_invoke = None - self.validate_parameters() # Context Menu commands can't have parents @@ -1556,10 +1042,6 @@ def validate_parameters(self): except StopIteration: pass - @property - def qualified_name(self): - return self.name - def to_dict(self) -> dict[str, str | int]: as_dict = { "name": self.name, @@ -1585,29 +1067,21 @@ def to_dict(self) -> dict[str, str | int]: class UserCommand(ContextMenuCommand): - r"""A class that implements the protocol for user context menu commands. + """A class that implements the protocol for user context menu commands. These are not created manually, instead they are created via the decorator or functional interface. + This is a subclass of :class:`.Invokable` but does not support the ``parent`` attribute. + + .. versionadded:: 2.0 + Attributes - ----------- - name: :class:`str` - The name of the command. - callback: :ref:`coroutine ` - The coroutine that is executed when the command is called. + ---------- guild_ids: Optional[List[:class:`int`]] The ids of the guilds where this command will be registered. - cog: Optional[:class:`.Cog`] - The cog that this command belongs to. ``None`` if there isn't one. - checks: List[Callable[[:class:`.ApplicationContext`], :class:`bool`]] - A list of predicates that verifies if the command could be executed - with the given :class:`.ApplicationContext` as the sole parameter. If an exception - is necessary to be thrown to signal failure, then one inherited from - :exc:`.ApplicationCommandError` should be used. Note that if the checks fail then - :exc:`.CheckFailure` exception is raised to the :func:`.on_application_command_error` - event. """ + type = 2 def __new__(cls, *args, **kwargs) -> UserCommand: @@ -1644,68 +1118,23 @@ async def _invoke(self, ctx: ApplicationContext) -> None: else: await self.callback(ctx, target) - def copy(self): - """Creates a copy of this command. - - Returns - ------- - :class:`UserCommand` - A new instance of this command. - """ - ret = self.__class__(self.callback, **self.__original_kwargs__) - return self._ensure_assignment_on_copy(ret) - - def _ensure_assignment_on_copy(self, other): - other._before_invoke = self._before_invoke - other._after_invoke = self._after_invoke - if self.checks != other.checks: - other.checks = self.checks.copy() - # if self._buckets.valid and not other._buckets.valid: - # other._buckets = self._buckets.copy() - # if self._max_concurrency != other._max_concurrency: - # # _max_concurrency won't be None at this point - # other._max_concurrency = self._max_concurrency.copy() # type: ignore - - try: - other.on_error = self.on_error - except AttributeError: - pass - return other - - def _update_copy(self, kwargs: dict[str, Any]): - if kwargs: - kw = kwargs.copy() - kw.update(self.__original_kwargs__) - copy = self.__class__(self.callback, **kw) - return self._ensure_assignment_on_copy(copy) - else: - return self.copy() - class MessageCommand(ContextMenuCommand): - r"""A class that implements the protocol for message context menu commands. + """A class that implements the protocol for message context menu commands. These are not created manually, instead they are created via the decorator or functional interface. + This is a subclass of :class:`.Invokable` but does not support the ``parent`` attribute. + + .. versionadded:: 2.0 + Attributes - ----------- - name: :class:`str` - The name of the command. - callback: :ref:`coroutine ` - The coroutine that is executed when the command is called. + ---------- guild_ids: Optional[List[:class:`int`]] The ids of the guilds where this command will be registered. - cog: Optional[:class:`.Cog`] - The cog that this command belongs to. ``None`` if there isn't one. - checks: List[Callable[[:class:`.ApplicationContext`], :class:`bool`]] - A list of predicates that verifies if the command could be executed - with the given :class:`.ApplicationContext` as the sole parameter. If an exception - is necessary to be thrown to signal failure, then one inherited from - :exc:`.ApplicationCommandError` should be used. Note that if the checks fail then - :exc:`.CheckFailure` exception is raised to the :func:`.on_application_command_error` - event. """ + type = 3 def __new__(cls, *args, **kwargs) -> MessageCommand: @@ -1741,43 +1170,6 @@ async def _invoke(self, ctx: ApplicationContext): else: await self.callback(ctx, target) - def copy(self): - """Creates a copy of this command. - - Returns - ------- - :class:`MessageCommand` - A new instance of this command. - """ - ret = self.__class__(self.callback, **self.__original_kwargs__) - return self._ensure_assignment_on_copy(ret) - - def _ensure_assignment_on_copy(self, other): - other._before_invoke = self._before_invoke - other._after_invoke = self._after_invoke - if self.checks != other.checks: - other.checks = self.checks.copy() - # if self._buckets.valid and not other._buckets.valid: - # other._buckets = self._buckets.copy() - # if self._max_concurrency != other._max_concurrency: - # # _max_concurrency won't be None at this point - # other._max_concurrency = self._max_concurrency.copy() # type: ignore - - try: - other.on_error = self.on_error - except AttributeError: - pass - return other - - def _update_copy(self, kwargs: dict[str, Any]): - if kwargs: - kw = kwargs.copy() - kw.update(self.__original_kwargs__) - copy = self.__class__(self.callback, **kw) - return self._ensure_assignment_on_copy(copy) - else: - return self.copy() - def slash_command(**kwargs): """Decorator for slash commands that invokes :func:`application_command`. @@ -1820,13 +1212,11 @@ def message_command(**kwargs): def application_command(cls=SlashCommand, **attrs): """A decorator that transforms a function into an :class:`.ApplicationCommand`. More specifically, - usually one of :class:`.SlashCommand`, :class:`.UserCommand`, or :class:`.MessageCommand`. The exact class + one of :class:`.SlashCommand`, :class:`.UserCommand`, or :class:`.MessageCommand`. The exact class depends on the ``cls`` parameter. - By default, the ``description`` attribute is received automatically from the - docstring of the function and is cleaned up with the use of - ``inspect.cleandoc``. If the docstring is ``bytes``, then it is decoded - into :class:`str` using utf-8 encoding. - The ``name`` attribute also defaults to the function name unchanged. + + The ``description`` and ``name`` of the command are automatically inferred from the function name + and function docstring. .. versionadded:: 2.0 diff --git a/discord/commands/mixins.py b/discord/commands/mixins.py new file mode 100644 index 0000000000..095810773d --- /dev/null +++ b/discord/commands/mixins.py @@ -0,0 +1,934 @@ +""" +The MIT License (MIT) + +Copyright (c) 2015-2021 Rapptz +Copyright (c) 2021-present Pycord Development + +Permission is hereby granted, free of charge, to any person obtaining a +copy of this software and associated documentation files (the "Software"), +to deal in the Software without restriction, including without limitation +the rights to use, copy, modify, merge, publish, distribute, sublicense, +and/or sell copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. +""" + +from __future__ import annotations + +import asyncio +import datetime +import functools +from copy import copy +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Coroutine, + Generic, + TypeVar, + Union, + overload, +) + +from .. import abc, utils +from ..errors import ( + ApplicationCommandError, + CheckFailure, + CommandError, + CommandInvokeError, + CommandOnCooldown, + DisabledCommand, +) +from .cooldowns import BucketType, Cooldown, CooldownMapping, MaxConcurrency + +T = TypeVar("T") +BotT = TypeVar("BotT", bound="Union[Bot, AutoShardedBot]") +CogT = TypeVar("CogT", bound="Cog") + +Coro = Coroutine[Any, Any, T] +MaybeCoro = Union[T, Coro[T]] + +Check = Union[ + Callable[[CogT, "BaseContext"], MaybeCoro[bool]], + Callable[["BaseContext"], MaybeCoro[bool]], +] + +Error = Union[ + Callable[[CogT, "BaseContext[Any]", CommandError], Coro[Any]], + Callable[["BaseContext[Any]", CommandError], Coro[Any]], +] +ErrorT = TypeVar("ErrorT", bound="Error") + +Hook = Union[ + Callable[[CogT, "BaseContext"], Coro[Any]], Callable[["BaseContext"], Coro[Any]] +] +HookT = TypeVar("HookT", bound="Hook") + +if TYPE_CHECKING: + from typing_extensions import Concatenate, ParamSpec + + from ..abc import MessageableChannel + from ..bot import AutoShardedBot, Bot + from ..cog import Cog + from ..guild import Guild + from ..interactions import Interaction + from ..member import Member + from ..message import Message + from ..state import ConnectionState + from ..user import ClientUser, User + from ..voice_client import VoiceProtocol + + P = ParamSpec("P") + + Callback = ( + Callable[Concatenate[CogT, "BaseContext", P], Coro[T]] + | Callable[Concatenate["BaseContext", P], Coro[T]] + ) +else: + P = TypeVar("P") + Callback = TypeVar("Callback") + + +__all__ = ( + "Invokable", + "_BaseCommand", + "BaseContext", +) + + +def unwrap_function(function: functools.partial | Callable) -> Callback: + while True: + if hasattr(function, "__wrapped__"): + function = getattr(function, "__wrapped__") + elif isinstance(function, functools.partial): + function = function.func + else: + return function + + +def wrap_callback(coro: Callback): + @functools.wraps(coro) + async def wrapper(*args, **kwargs): + try: + ret = await coro(*args, **kwargs) + except CommandError: + raise + except asyncio.CancelledError: + return + except Exception as exc: + raise CommandInvokeError(exc) from exc + return ret + + return wrapper + + +def hook_wrapped_callback(command: Invokable, ctx: BaseContext, coro: Callback): + @functools.wraps(coro) + async def wrapper(*args, **kwargs): + try: + ret = await coro(*args, **kwargs) + except (ApplicationCommandError, CommandError): + ctx.command_failed = True + raise + except asyncio.CancelledError: + ctx.command_failed = True + return + except Exception as exc: + ctx.command_failed = True + raise CommandInvokeError(exc) from exc + finally: + if command._max_concurrency is not None: + await command._max_concurrency.release(ctx) + await command.call_after_hooks(ctx) + + return ret + + return wrapper + + +class _BaseCommand: + __slots__ = () + + +class BaseContext(abc.Messageable, Generic[BotT]): + r"""A base class to provide ***basic & common functionality*** between + :class:`.ApplicationContext` and :class:`~ext.commands.Context`. + + This is a subclass of :class:`~abc.Messageable` and can be used to + send messages, etc. + + .. versionadded:: 2.2 + + Attributes + ---------- + bot: :class:`.Bot` + The bot that contains the command being executed. + command: Optional[:class:`Invokable`] + The command that is being invoked currently. + args: :class:`list` + The list of transformed arguments that were passed into the command. + If this is accessed during the :func:`.on_command_error` event + then this list could be incomplete. + kwargs: :class:`dict` + A dictionary of transformed arguments that were passed into the command. + Similar to :attr:`args`\, if this is accessed in the + :func:`.on_command_error` event then this dict could be incomplete. + invoked_with: Optional[:class:`str`] + The command name that triggered this invocation. Useful for finding out + which alias called the command. + invoked_parents: List[:class:`str`] + The command names of the parents that triggered this invocation. Useful for + finding out which aliases called the command. + + For example in commands ``?a b c test``, the invoked parents are ``['a', 'b', 'c']``. + invoked_subcommand: Optional[:class:`Invokable`] + The subcommand that was invoked. + If no valid subcommand was invoked then this is equal to ``None``. + subcommand_passed: Optional[:class:`str`] + The string that was attempted to call a subcommand. This does not have + to point to a valid registered subcommand and could just point to a + nonsense string. If nothing was passed to attempt a call to a + subcommand then this is set to ``None``. + + .. note:: + + This will always be ``None`` if accessed on through a slash command. + + command_failed: :class:`bool` + A boolean that indicates if the command failed to be parsed, checked, + or invoked. + """ + + def __init__( + self, + bot: Bot, + command: Invokable | None, + args: list[Any] = utils.MISSING, + kwargs: dict[str, Any] = utils.MISSING, + *, + invoked_with: str | None = None, + invoked_parents: list[str] = utils.MISSING, + invoked_subcommand: Invokable | None = None, + subcommand_passed: str | None = None, + command_failed: bool = False, + ): + self.bot: Bot = bot + self.command: Invokable | None = command + self.args: list[Any] = args or [] + self.kwargs: dict[str, Any] = kwargs or {} + + self.invoked_with: str | None = invoked_with + if not self.invoked_with and command: + self.invoked_with = command.name + + self.invoked_parents: list[str] = invoked_parents or [] + if not self.invoked_parents and command: + self.invoked_parents = [i.name for i in command.parents] + + # This will always be None for slash commands + self.subcommand_passed: str | None = subcommand_passed + + self.invoked_subcommand: Invokable | None = invoked_subcommand + self.command_failed: bool = command_failed + + async def invoke( + self, command: Invokable[CogT, P, T], /, *args: P.args, **kwargs: P.kwargs + ) -> T: + r"""|coro| + + Invokes a command with the arguments given. + + This is useful if you want to just call the callback that a + :class:`.Invokable` holds internally. + + .. note:: + + This does not handle converters, checks, cooldowns, before-invoke, + or after-invoke hooks in any matter. It calls the internal callback + directly as-if it was a regular function. + + You must take care in passing the proper arguments when + using this function. + + Parameters + ----------- + command: :class:`.Invokable` + The command that is going to be called. + \*args + The arguments to use. + \*\*kwargs + The keyword arguments to use. + + Raises + ------- + TypeError + The command argument to invoke is missing. + """ + return await command(self, *args, **kwargs) + + async def _get_channel(self) -> abc.Messageable: + return self.channel + + @property + def source(self) -> Message | Interaction: + """Property to return a message or interaction + depending on the context. + """ + raise NotImplementedError + + @property + def _state(self) -> ConnectionState: + return self.source._state + + @property + def cog(self) -> Cog | None: + """Returns the cog associated with this context's command. + ``None`` if it does not exist. + """ + if self.command is None: + return None + return self.command.cog + + @utils.cached_property + def guild(self) -> Guild | None: + """Returns the guild associated with this context's command. + Shorthand for :attr:`.Interaction.guild`. + """ + return self.source.guild + + @utils.cached_property + def guild_id(self) -> int | None: + """Returns the ID of the guild associated with this context's command. + Shorthand for :attr:`.Interaction.guild_id`. + """ + return getattr(self.source, "guild_id", self.guild.id if self.guild else None) + + @utils.cached_property + def channel(self) -> MessageableChannel: + """Union[:class:`.abc.Messageable`]: Returns the channel associated with this context's command.""" + return self.source.channel + + @utils.cached_property + def channel_id(self) -> int | None: + """Returns the ID of the channel associated with this context's command. + Shorthand for :attr:`.Interaction.channel_id`. + """ + return getattr( + self.source, "channel_id", self.channel.id if self.channel else None + ) + + @utils.cached_property + def author(self) -> User | Member: + """Returns the user that sent this context's command. + Shorthand for :attr:`.Interaction.user`. + """ + return self.source.author + + @property + def user(self) -> User | Member: + """Alias for :attr:`BaseContext.author`.""" + return self.author + + @utils.cached_property + def me(self) -> Member | ClientUser: + """Similar to :attr:`.Guild.me` except it may return the :class:`.ClientUser` + in private message contexts, or when :meth:`.Intents.guilds` is absent. + """ + # bot.user will never be None at this point. + return self.guild.me if self.guild and self.guild.me else self.bot.user # type: ignore + + @property + def voice_client(self) -> VoiceProtocol | None: + """Returns the voice client associated with this context's command.""" + return self.guild.voice_client if self.guild else None + + +class Invokable(Generic[CogT, P, T]): + r"""A base class to provide ***basic & common functionality*** between + :class:`.ApplicationCommand` and :class:`~ext.commands.Command`. + + .. versionadded:: 2.2 + + Attributes + ---------- + name: str + The name of the invokable/command. + callback: :ref:`coroutine ` + The coroutine that is executed when the command is called. + parent: Optional[:class:`Invokable`] + The parent group of this command. + cog: Optional[:class:`Cog`] + The cog that this command belongs to. ``None`` if there isn't one. + enabled: :class:`bool` + A boolean that indicates if the command is currently enabled. + If the command is invoked while it is disabled, then + :exc:`.DisabledCommand` is raised to the :func:`.on_command_error` + event. Defaults to ``True``. + checks: List[Callable[[:class:`.BaseContext`], :class:`bool`]] + A list of predicates that verifies if the command could be executed with the given + :class:`.BaseContext` (:class:`.ApplicationContext` or :class:`~ext.commands.Context` + to be specific) as the sole parameter. If an exception is necessary to be thrown to + signal failure, then one inherited from :exc:`.CommandError` should be used. Note that + if the checks fail then :exc:`.CheckFailure` exception is raised to the :func:`.on_command_error` + event. + cooldown_after_parsing: :class:`bool` + If ``True``\, cooldown processing is done after argument parsing, + which calls converters. If ``False`` then cooldown processing is done + first and then the converters are called second. Defaults to ``False``. + cooldown: Optional[:class:`Cooldown`] + The cooldown applied when the command is invoked. + """ + __original_kwargs__: dict[str, Any] + + def __new__(cls, *args, **kwargs) -> Invokable: + self = super().__new__(cls) + + self.__original_kwargs__ = kwargs.copy() + return self + + def __init__( + self, + func: Callback, + name: str | None = None, + enabled: bool = True, + cooldown_after_parsing: bool = False, + parent: Invokable | None = None, + checks: list[Check] | None = None, + cooldown: CooldownMapping | None = None, + max_concurrency: MaxConcurrency | None = None, + **kwargs, + ): + self.callback: Callback = func + self.parent: Invokable | None = ( + parent if isinstance(parent, _BaseCommand) else None + ) + self.cog: CogT | None = None + self.module: Any = None + + self.name: str = str(name or func.__name__) + self.enabled: bool = enabled + self.cooldown_after_parsing: bool = cooldown_after_parsing + + if not checks: + checks = [] + + # checks + if _checks := getattr(func, "__commands_checks__", []): + # combine all that we find (kwargs or decorator) + _checks.reverse() + checks += _checks + + self.checks: list[Check] = checks + + # cooldowns + cooldown = getattr(func, "__commands_cooldown__", cooldown) + + if cooldown is None: + buckets = CooldownMapping(cooldown, BucketType.default) + elif isinstance(cooldown, CooldownMapping): + buckets = cooldown + else: + raise TypeError( + "Cooldown must be a an instance of CooldownMapping or None." + ) + + self._buckets: CooldownMapping = buckets + + # max concurrency + self._max_concurrency: MaxConcurrency | None = getattr( + func, "__commands_max_concurrency__", max_concurrency + ) + + # hooks + self._before_invoke: Hook | None = None + if hook := getattr(func, "__before_invoke__", None): + self.before_invoke(hook) + + self._after_invoke: Hook | None = None + if hook := getattr(func, "__after_invoke__", None): + self.after_invoke(hook) + + self.on_error: Error | None + + @property + def callback(self) -> Callback: + """Returns the command's callback.""" + return self._callback + + @callback.setter + def callback(self, func: Callback) -> None: + if not asyncio.iscoroutinefunction(func): + raise TypeError("Callback must be a coroutine.") + + self._callback = func + unwrap = unwrap_function(func) + self.module = unwrap.__module__ + + @property + def cooldown(self) -> Cooldown | None: + """Returns the cooldown for the command.""" + return self._buckets._cooldown + + @property + def qualified_name(self) -> str: + """Retrieves the fully qualified command name. + + This is the full name of the parent command with the subcommand name as well. + For example, in ``?one two three``, the qualified name would be + ``one two three``. + """ + if not self.parent: + return self.name + + return f"{self.parent.qualified_name} {self.name}" + + @property + def cog_name(self) -> str | None: + """Optional[:class:`str`]: The name of the cog this command belongs to, if any.""" + return type(self.cog).__cog_name__ if self.cog is not None else None + + @property + def parents(self) -> list[Invokable]: + """List[:class:`Invokable`]: Retrieves the parents of this command. + + If the command has no parents then it returns an empty :class:`list`. + + For example in commands ``?a b c test``, the parents are ``[c, b, a]``. + """ + entries = [] + command = self + while command.parent is not None: + command = command.parent + entries.append(command) + + return entries + + @property + def root_parent(self) -> Invokable | None: + """Optional[:class:`Invokable`]: Retrieves the root parent of this command. + + If the command has no parents then it returns ``None``. + + For example in commands ``?a b c test``, the root parent is ``a``. + """ + if not self.parent: + return None + return self.parents[-1] + + @property + def full_parent_name(self) -> str | None: + """Retrieves the fully qualified parent command name. + + This the base command name required to execute it. For example, + in ``/one two three`` the parent name would be ``one two``. + """ + if self.parent: + return self.parent.qualified_name + + def __str__(self) -> str: + return self.qualified_name + + async def __call__(self, ctx: BaseContext, *args: P.args, **kwargs: P.kwargs): + """|coro| + + Calls the internal callback that the command holds. + + .. note:: + + This bypasses all mechanisms -- including checks, converters, + invoke hooks, cooldowns, etc. You must take care to pass + the proper arguments and types to this function. + """ + new_args = (ctx, *args) + if self.cog is not None: + new_args = (self.cog, *new_args) + return await self.callback(*new_args, **kwargs) + + @overload + def update( + self, + *, + func: Callback | None = ..., + name: str | None = ..., + enabled: bool = False, + cooldown_after_parsing: bool = ..., + parent: Invokable | None = ..., + checks: list[Check] = ..., + cooldown: CooldownMapping | None = ..., + max_concurrency: MaxConcurrency | None = ..., + ) -> None: + ... + + @overload + def update(self) -> None: + ... + + def update(self, **kwargs) -> None: + """Updates the :class:`Invokable` instance with updated attribute. + + Similar to creating a new instance except it updates the current. + """ + self.__init__(self.callback, **dict(self.__original_kwargs__, **kwargs)) + + def has_error_handler(self) -> bool: + """Checks whether the command has an error handler registered.""" + return hasattr(self, "on_error") + + def before_invoke(self, coro: HookT) -> HookT: + """A decorator that registers a coroutine as a pre-invoke hook. + + A pre-invoke hook is called directly before the command is + called. This makes it a useful function to set up database + connections or any type of set up required. + + This pre-invoke hook takes a sole parameter, a :class:`.BaseContext`. + See :meth:`.Bot.before_invoke` for more info. + + Parameters + ---------- + coro: :ref:`coroutine ` + The coroutine to register as the pre-invoke hook. + + Raises + ------ + TypeError + The coroutine passed is not actually a coroutine. + """ + if not asyncio.iscoroutinefunction(coro): + raise TypeError("The pre-invoke hook must be a coroutine.") + + self._before_invoke = coro + return coro + + def after_invoke(self, coro: HookT) -> HookT: + """A decorator that registers a coroutine as a post-invoke hook. + + A post-invoke hook is called directly after the command is + called. This makes it a useful function to clean-up database + connections or any type of clean up required. + + This post-invoke hook takes a sole parameter, a :class:`.BaseContext`. + + See :meth:`.Bot.after_invoke` for more info. + + Parameters + ---------- + coro: :ref:`coroutine ` + The coroutine to register as the post-invoke hook. + + Raises + ------ + TypeError + The coroutine passed is not actually a coroutine. + """ + if not asyncio.iscoroutinefunction(coro): + raise TypeError("The post-invoke hook must be a coroutine.") + + self._after_invoke = coro + return coro + + def error(self, coro: ErrorT) -> ErrorT: + """A decorator that registers a coroutine as a local error handler. + A local error handler is an :func:`.on_command_error`/ + :func:`.on_application_command_error` event limited to a single command. + However, the actual :func:`.on_command_error`/:func:`.on_application_command_error` + is still invoked afterwards as the catch-all. + + Parameters + ---------- + coro: :ref:`coroutine ` + The coroutine to register as the local error handler. + + Raises + ------ + TypeError + The coroutine passed is not actually a coroutine. + """ + + if not asyncio.iscoroutinefunction(coro): + raise TypeError("The error handler must be a coroutine.") + + self.on_error: Error = coro + return coro + + def add_check(self, func: Check) -> None: + """Adds a check to the command. + + This is the non-decorator interface to :func:`.check`. + + Parameters + ---------- + func: Callable + The function that will be used as a check. + """ + + self.checks.append(func) + + def remove_check(self, func: Check) -> None: + """Removes a check from the command. + + This function is idempotent and will not raise an exception + if the function is not in the command's checks. + + Parameters + ---------- + func: Callable + The function to remove from the checks. + """ + + try: + self.checks.remove(func) + except ValueError: + pass + + def copy(self, overrides: dict[str, Any] | None = None): + """Creates a copy of this command. + + Returns + ------- + :class:`Invokable` + A new instance of this command. + """ + + thecopy = copy(self) + if overrides: + for attr, val in overrides.items(): + setattr(thecopy, attr, val) + + return thecopy + + def _set_cog(self, cog: CogT): + self.cog = cog + + def is_on_cooldown(self, ctx: BaseContext) -> bool: + """Checks whether the command is currently on cooldown. + + .. note:: + + This uses the current time instead of the interaction time. + + Parameters + ---------- + ctx: :class:`.BaseContext` + The invocation context to use when checking the command's cooldown status. + + Returns + ------- + :class:`bool` + A boolean indicating if the command is on cooldown. + """ + if not self._buckets.valid: + return False + + bucket = self._buckets.get_bucket(ctx) + current = utils.utcnow().timestamp() + return bucket.get_tokens(current) == 0 + + def reset_cooldown(self, ctx) -> None: + """Resets the cooldown on this command. + + Parameters + ---------- + ctx: :class:`.BaseContext` + The invocation context to reset the cooldown under. + """ + if self._buckets.valid: + bucket = self._buckets.get_bucket(ctx) # type: ignore # ctx instead of non-existent message + bucket.reset() + + def get_cooldown_retry_after(self, ctx) -> float: + """Retrieves the amount of seconds before this command can be tried again. + + .. note:: + + This uses the current time instead of the interaction time. + + Parameters + ---------- + ctx: :class:`.BaseContext` + The invocation context to retrieve the cooldown from. + + Returns + ------- + :class:`float` + The amount of time left on this command's cooldown in seconds. + If this is ``0.0`` then the command isn't on cooldown. + """ + if self._buckets.valid: + bucket = self._buckets.get_bucket(ctx) + current = utils.utcnow().timestamp() + return bucket.get_retry_after(current) + + return 0.0 + + def _prepare_cooldowns(self, ctx: BaseContext): + if not self._buckets.valid: + return + + current = datetime.datetime.now().timestamp() + bucket = self._buckets.get_bucket(ctx, current) # type: ignore # ctx instead of non-existent message + + if bucket: + retry_after = bucket.update_rate_limit(current) + + if retry_after: + raise CommandOnCooldown(bucket, retry_after, self._buckets.type) # type: ignore + + async def call_before_hooks(self, ctx: BaseContext) -> None: + # now that we're done preparing we can call the pre-command hooks + # first, call the command local hook: + cog = self.cog + if self._before_invoke is not None: + # should be cog if @commands.before_invoke is used + instance = getattr(self._before_invoke, "__self__", cog) + # __self__ only exists for methods, not functions + # however, if @command.before_invoke is used, it will be a function + if instance: + await self._before_invoke(instance, ctx) # type: ignore + else: + await self._before_invoke(ctx) # type: ignore + + # call the cog local hook if applicable: + if cog is not None: + hook = cog.__class__._get_overridden_method(cog.cog_before_invoke) + if hook is not None: + await hook(ctx) + + # call the bot global hook if necessary + hook = ctx.bot._before_invoke + if hook is not None: + await hook(ctx) + + async def call_after_hooks(self, ctx: BaseContext) -> None: + cog = self.cog + if self._after_invoke is not None: + instance = getattr(self._after_invoke, "__self__", cog) + if instance: + await self._after_invoke(instance, ctx) # type: ignore + else: + await self._after_invoke(ctx) # type: ignore + + # call the cog local hook if applicable: + if cog is not None: + hook = cog.__class__._get_overridden_method(cog.cog_after_invoke) + if hook is not None: + await hook(ctx) + + hook = ctx.bot._after_invoke + if hook is not None: + await hook(ctx) + + async def _parse_arguments(self, ctx: BaseContext) -> None: + """Parses arguments and attaches them to the context class (Union[:class:`~ext.commands.Context`, :class:`.ApplicationContext`])""" + raise NotImplementedError + + async def prepare(self, ctx: BaseContext) -> None: + ctx.command = self + + if not await self.can_run(ctx): + raise CheckFailure( + f"The check functions for command {self.qualified_name} failed." + ) + + if self._max_concurrency is not None: + # For this application, context can be duck-typed as a Message + await self._max_concurrency.acquire(ctx) # type: ignore + + try: + if self.cooldown_after_parsing: + await self._parse_arguments(ctx) + self._prepare_cooldowns(ctx) + else: + self._prepare_cooldowns(ctx) + await self._parse_arguments(ctx) + + await self.call_before_hooks(ctx) + except: + if self._max_concurrency is not None: + await self._max_concurrency.release(ctx) # type: ignore + raise + + async def invoke(self, ctx: BaseContext) -> None: + """Runs the command with checks. + + Parameters + ---------- + ctx: :class:`.BaseContext` + The context to pass into the command. + """ + await self.prepare(ctx) + + # terminate the invoked_subcommand chain. + # since we're in a regular command (and not a group) then + # the invoked subcommand is None. + ctx.invoked_subcommand = None + ctx.subcommand_passed = None + injected = hook_wrapped_callback(self, ctx, self.callback) + await injected(*ctx.args, **ctx.kwargs) + + async def reinvoke(self, ctx: BaseContext, *, call_hooks: bool = False) -> None: + """|coro| + + Calls the command again. + + This is similar to :meth:`Invokable.invoke` except that it bypasses + checks, cooldowns, and error handlers. + + Parameters + ---------- + ctx: BaseContext + The context to invoke with. + call_hooks: :class:`bool` + Whether to call the before and after invoke hooks. + """ + + ctx.command = self + await self._parse_arguments(ctx) + + if call_hooks: + await self.call_before_hooks(ctx) + + ctx.invoked_subcommand = None + try: + await self.callback(*ctx.args, **ctx.kwargs) # type: ignore + except: + ctx.command_failed = True + raise + finally: + if call_hooks: + await self.call_after_hooks(ctx) + + async def _dispatch_error(self, ctx: BaseContext, error: Exception) -> None: + # since I don't want to copy paste code, subclassed Contexts + # dispatch it to their corresponding events + raise NotImplementedError + + async def dispatch_error(self, ctx: BaseContext, error: Exception) -> None: + ctx.command_failed = True + cog = self.cog + + if coro := getattr(self, "on_error", None): + injected = wrap_callback(coro) + if cog is not None: + await injected(cog, ctx, error) + else: + await injected(ctx, error) + + try: + if cog is not None: + local = cog.__class__._get_overridden_method(cog.cog_command_error) + if local is not None: + wrapped = wrap_callback(local) + await wrapped(ctx, error) + finally: + await self._dispatch_error(ctx, error) diff --git a/discord/errors.py b/discord/errors.py index 589f4f2014..f3efc25672 100644 --- a/discord/errors.py +++ b/discord/errors.py @@ -37,6 +37,7 @@ except ModuleNotFoundError: _ResponseType = ClientResponse + from .commands.cooldowns import BucketType, Cooldown from .interactions import Interaction __all__ = ( @@ -61,9 +62,15 @@ "NoEntryPointError", "ExtensionFailed", "ExtensionNotFound", + "CommandError", + "CommandInvokeError", "ApplicationCommandError", - "CheckFailure", "ApplicationCommandInvokeError", + "CheckFailure", + "MaxConcurrencyReached", + "CommandOnCooldown", + "DisabledCommand", + "UserInputError", ) @@ -97,24 +104,6 @@ class ValidationError(DiscordException): """An Exception that is raised when there is a Validation Error.""" -def _flatten_error_dict(d: dict[str, Any], key: str = "") -> dict[str, str]: - items: list[tuple[str, str]] = [] - for k, v in d.items(): - new_key = f"{key}.{k}" if key else k - - if isinstance(v, dict): - try: - _errors: list[dict[str, Any]] = v["_errors"] - except KeyError: - items.extend(_flatten_error_dict(v, new_key).items()) - else: - items.append((new_key, " ".join(x.get("message", "") for x in _errors))) - else: - items.append((new_key, v)) - - return dict(items) - - class HTTPException(DiscordException): """Exception that's raised when an HTTP request operation fails. @@ -283,6 +272,123 @@ def __init__(self, interaction: Interaction): super().__init__("This interaction has already been responded to before") +# command errors + + +class CommandError(DiscordException): + r"""The base exception type for all command related errors. + + This inherits from :exc:`discord.DiscordException`. + + This exception and exceptions inherited from it are handled + in a special way as they are caught and passed into a special event + from :class:`.Bot`\, :func:`.on_command_error`. + """ + + def __init__(self, message: str | None = None, *args: Any) -> None: + if message is not None: + # clean-up @everyone and @here mentions + m = message.replace("@everyone", "@\u200beveryone").replace( + "@here", "@\u200bhere" + ) + super().__init__(m, *args) + else: + super().__init__(*args) + + +class CommandInvokeError(CommandError): + """Exception raised when the command being invoked raised an exception. + + This inherits from :exc:`CommandError` + + Attributes + ---------- + original: :exc:`Exception` + The original exception that was raised. You can also get this via + the ``__cause__`` attribute. + """ + + def __init__(self, e: Exception) -> None: + self.original: Exception = e + super().__init__(f"Command raised an exception: {e.__class__.__name__}: {e}") + + +ApplicationCommandError = CommandError +ApplicationCommandInvokeError = CommandInvokeError + + +class CheckFailure(CommandError): + """Exception raised when the predicates in :attr:`.Command.checks` have failed. + + This inherits from :exc:`CommandError` + """ + + +class MaxConcurrencyReached(CommandError): + """Exception raised when the command being invoked has reached its maximum concurrency. + + This inherits from :exc:`CommandError`. + + Attributes + ---------- + number: :class:`int` + The maximum number of concurrent invokers allowed. + per: :class:`.BucketType` + The bucket type passed to the :func:`.max_concurrency` decorator. + """ + + def __init__(self, number: int, per: BucketType) -> None: + self.number: int = number + self.per: BucketType = per + name = per.name + suffix = f"per {name}" if per.name != "default" else "globally" + plural = "%s times %s" if number > 1 else "%s time %s" + fmt = plural % (number, suffix) + super().__init__( + f"Too many people are using this command. It can only be used {fmt} concurrently." + ) + + +class CommandOnCooldown(CommandError): + """Exception raised when the command being invoked is on cooldown. + + This inherits from :exc:`CommandError` + + Attributes + ---------- + cooldown: :class:`.Cooldown` + A class with attributes ``rate`` and ``per`` similar to the + :func:`.cooldown` decorator. + type: :class:`BucketType` + The type associated with the cooldown. + retry_after: :class:`float` + The amount of seconds to wait before you can retry again. + """ + + def __init__( + self, cooldown: Cooldown, retry_after: float, type: BucketType + ) -> None: + self.cooldown: Cooldown = cooldown + self.retry_after: float = retry_after + self.type: BucketType = type + super().__init__(f"You are on cooldown. Try again in {retry_after:.2f}s") + + +class DisabledCommand(CommandError): + """Exception raised when the command being invoked is disabled. + + This inherits from :exc:`CommandError` + """ + + +class UserInputError(CommandError): + """The base exception type for errors that involve errors + regarding user input. + + This inherits from :exc:`CommandError`. + """ + + class ExtensionError(DiscordException): """Base exception for extension related errors. @@ -376,38 +482,19 @@ def __init__(self, name: str) -> None: super().__init__(msg, name=name) -class ApplicationCommandError(DiscordException): - r"""The base exception type for all application command related errors. - - This inherits from :exc:`DiscordException`. - - This exception and exceptions inherited from it are handled - in a special way as they are caught and passed into a special event - from :class:`.Bot`\, :func:`.on_command_error`. - """ - - -class CheckFailure(ApplicationCommandError): - """Exception raised when the predicates in :attr:`.Command.checks` have failed. - - This inherits from :exc:`ApplicationCommandError` - """ - - -class ApplicationCommandInvokeError(ApplicationCommandError): - """Exception raised when the command being invoked raised an exception. - - This inherits from :exc:`ApplicationCommandError` +def _flatten_error_dict(d: dict[str, Any], key: str = "") -> dict[str, str]: + items: list[tuple[str, str]] = [] + for k, v in d.items(): + new_key = f"{key}.{k}" if key else k - Attributes - ---------- - original: :exc:`Exception` - The original exception that was raised. You can also get this via - the ``__cause__`` attribute. - """ + if isinstance(v, dict): + try: + _errors: list[dict[str, Any]] = v["_errors"] + except KeyError: + items.extend(_flatten_error_dict(v, new_key).items()) + else: + items.append((new_key, " ".join(x.get("message", "") for x in _errors))) + else: + items.append((new_key, v)) - def __init__(self, e: Exception) -> None: - self.original: Exception = e - super().__init__( - f"Application Command raised an exception: {e.__class__.__name__}: {e}" - ) + return dict(items) diff --git a/discord/ext/commands/cog.py b/discord/ext/commands/cog.py index aa61fb24d0..b0bba0b8c8 100644 --- a/discord/ext/commands/cog.py +++ b/discord/ext/commands/cog.py @@ -29,7 +29,7 @@ import discord from ...cog import Cog -from ...commands import ApplicationCommand, SlashCommandGroup +from ...commands import ApplicationCommand, Invokable, SlashCommandGroup if TYPE_CHECKING: from .core import Command @@ -49,12 +49,12 @@ def __new__(cls: type[CogT], *args: Any, **kwargs: Any) -> CogT: # To do this, we need to interfere with the Cog creation process. return super().__new__(cls) - def walk_commands(self) -> Generator[Command, None, None]: + def walk_commands(self) -> Generator[Invokable, None, None]: """An iterator that recursively walks through this cog's commands and subcommands. Yields ------ - Union[:class:`.Command`, :class:`.Group`] + :class:`.Invokable` A command or group from the cog. """ from .core import GroupMixin @@ -74,7 +74,7 @@ def get_commands(self) -> list[ApplicationCommand | Command]: r""" Returns -------- - List[Union[:class:`~discord.ApplicationCommand`, :class:`.Command`]] + List[:class:`.Invokable`] A :class:`list` of commands that are defined inside this cog. .. note:: diff --git a/discord/ext/commands/context.py b/discord/ext/commands/context.py index 08d0f9f092..6580b965e0 100644 --- a/discord/ext/commands/context.py +++ b/discord/ext/commands/context.py @@ -32,19 +32,15 @@ import discord.utils from discord.message import Message +from ...commands import ApplicationContext, BaseContext + if TYPE_CHECKING: from typing_extensions import ParamSpec - from discord.abc import MessageableChannel - from discord.guild import Guild - from discord.member import Member - from discord.state import ConnectionState - from discord.user import ClientUser, User - from discord.voice_client import VoiceProtocol - from .bot import AutoShardedBot, Bot from .cog import Cog from .core import Command + from .help import HelpCommand from .view import StringView __all__ = ("Context",) @@ -62,8 +58,8 @@ P = TypeVar("P") -class Context(discord.abc.Messageable, Generic[BotT]): - r"""Represents the context in which a command is being invoked under. +class Context(BaseContext, Generic[BotT]): + """Represents the context in which a command is being invoked under. This class contains a lot of metadata to help you understand more about the invocation context. This class is not created manually and is instead @@ -72,19 +68,9 @@ class Context(discord.abc.Messageable, Generic[BotT]): This class implements the :class:`~discord.abc.Messageable` ABC. Attributes - ----------- + ---------- message: :class:`.Message` The message that triggered the command being executed. - bot: :class:`.Bot` - The bot that contains the command being executed. - args: :class:`list` - The list of transformed arguments that were passed into the command. - If this is accessed during the :func:`.on_command_error` event - then this list could be incomplete. - kwargs: :class:`dict` - A dictionary of transformed arguments that were passed into the command. - Similar to :attr:`args`\, if this is accessed in the - :func:`.on_command_error` event then this dict could be incomplete. current_parameter: Optional[:class:`inspect.Parameter`] The parameter that is currently being inspected and converted. This is only of use for within converters. @@ -92,32 +78,13 @@ class Context(discord.abc.Messageable, Generic[BotT]): .. versionadded:: 2.0 prefix: Optional[:class:`str`] The prefix that was used to invoke the command. - command: Optional[:class:`Command`] - The command that is being invoked currently. - invoked_with: Optional[:class:`str`] - The command name that triggered this invocation. Useful for finding out - which alias called the command. - invoked_parents: List[:class:`str`] - The command names of the parents that triggered this invocation. Useful for - finding out which aliases called the command. - - For example in commands ``?a b c test``, the invoked parents are ``['a', 'b', 'c']``. - - .. versionadded:: 1.7 - - invoked_subcommand: Optional[:class:`Command`] - The subcommand that was invoked. - If no valid subcommand was invoked then this is equal to ``None``. - subcommand_passed: Optional[:class:`str`] - The string that was attempted to call a subcommand. This does not have - to point to a valid registered subcommand and could just point to a - nonsense string. If nothing was passed to attempt a call to a - subcommand then this is set to ``None``. command_failed: :class:`bool` A boolean that indicates if the command failed to be parsed, checked, or invoked. """ + command: Command | None + def __init__( self, *, @@ -128,93 +95,22 @@ def __init__( kwargs: dict[str, Any] = MISSING, prefix: str | None = None, command: Command | None = None, - invoked_with: str | None = None, - invoked_parents: list[str] = MISSING, - invoked_subcommand: Command | None = None, - subcommand_passed: str | None = None, - command_failed: bool = False, current_parameter: inspect.Parameter | None = None, + **kwargs2: dict[str, Any], ): + super().__init__(bot=bot, command=command, args=args, kwargs=kwargs, **kwargs2) + self.message: Message = message - self.bot: BotT = bot - self.args: list[Any] = args or [] - self.kwargs: dict[str, Any] = kwargs or {} self.prefix: str | None = prefix - self.command: Command | None = command self.view: StringView = view - self.invoked_with: str | None = invoked_with - self.invoked_parents: list[str] = invoked_parents or [] - self.invoked_subcommand: Command | None = invoked_subcommand - self.subcommand_passed: str | None = subcommand_passed - self.command_failed: bool = command_failed self.current_parameter: inspect.Parameter | None = current_parameter - self._state: ConnectionState = self.message._state - - async def invoke( - self, command: Command[CogT, P, T], /, *args: P.args, **kwargs: P.kwargs - ) -> T: - r"""|coro| - - Calls a command with the arguments given. - - This is useful if you want to just call the callback that a - :class:`.Command` holds internally. - - .. note:: - - This does not handle converters, checks, cooldowns, pre-invoke, - or after-invoke hooks in any matter. It calls the internal callback - directly as-if it was a regular function. - You must take care in passing the proper arguments when - using this function. - - Parameters - ----------- - command: :class:`.Command` - The command that is going to be called. - \*args - The arguments to use. - \*\*kwargs - The keyword arguments to use. - - Raises - ------- - TypeError - The command argument to invoke is missing. - """ - return await command(self, *args, **kwargs) + @property + def source(self) -> Message: + return self.message + @discord.utils.copy_doc(ApplicationContext.reinvoke) async def reinvoke(self, *, call_hooks: bool = False, restart: bool = True) -> None: - """|coro| - - Calls the command again. - - This is similar to :meth:`~.Context.invoke` except that it bypasses - checks, cooldowns, and error handlers. - - .. note:: - - If you want to bypass :exc:`.UserInputError` derived exceptions, - it is recommended to use the regular :meth:`~.Context.invoke` - as it will work more naturally. After all, this will end up - using the old arguments the user has used and will thus just - fail again. - - Parameters - ---------- - call_hooks: :class:`bool` - Whether to call the before and after invoke hooks. - restart: :class:`bool` - Whether to start the call chain from the very beginning - or where we left off (i.e. the command that caused the error). - The default is to start where we left off. - - Raises - ------ - ValueError - The context to reinvoke is not valid. - """ cmd = self.command view = self.view if cmd is None: @@ -252,9 +148,6 @@ def valid(self) -> bool: """Checks if the invocation context is valid to be invoked with.""" return self.prefix is not None and self.command is not None - async def _get_channel(self) -> discord.abc.Messageable: - return self.channel - @property def clean_prefix(self) -> str: """The cleaned up invoke prefix. i.e. mentions are ``@name`` instead of ``<@id>``. @@ -272,52 +165,6 @@ def clean_prefix(self) -> str: pattern = re.compile(r"<@!?%s>" % user.id) return pattern.sub("@%s" % user.display_name.replace("\\", r"\\"), self.prefix) - @property - def cog(self) -> Cog | None: - """Returns the cog associated with this context's command. - None if it does not exist. - """ - - if self.command is None: - return None - return self.command.cog - - @discord.utils.cached_property - def guild(self) -> Guild | None: - """Returns the guild associated with this context's command. - None if not available. - """ - return self.message.guild - - @discord.utils.cached_property - def channel(self) -> MessageableChannel: - """Returns the channel associated with this context's command. - Shorthand for :attr:`.Message.channel`. - """ - return self.message.channel - - @discord.utils.cached_property - def author(self) -> User | Member: - """Union[:class:`~discord.User`, :class:`.Member`]: - Returns the author associated with this context's command. Shorthand for :attr:`.Message.author` - """ - return self.message.author - - @discord.utils.cached_property - def me(self) -> Member | ClientUser: - """Union[:class:`.Member`, :class:`.ClientUser`]: - Similar to :attr:`.Guild.me` except it may return the :class:`.ClientUser` in private message - message contexts, or when :meth:`Intents.guilds` is absent. - """ - # bot.user will never be None at this point. - return self.guild.me if self.guild is not None and self.guild.me is not None else self.bot.user # type: ignore - - @property - def voice_client(self) -> VoiceProtocol | None: - r"""A shortcut to :attr:`.Guild.voice_client`\, if applicable.""" - g = self.guild - return g.voice_client if g else None - async def send_help(self, *args: Any) -> Any: """send_help(entity=) @@ -348,7 +195,8 @@ async def send_help(self, *args: Any) -> Any: Any The result of the help command, if any. """ - from .core import Command, Group, wrap_callback + from ...commands.mixins import wrap_callback + from .core import Group from .errors import CommandError bot = self.bot diff --git a/discord/ext/commands/cooldowns.py b/discord/ext/commands/cooldowns.py index 55792aba36..bac752e19f 100644 --- a/discord/ext/commands/cooldowns.py +++ b/discord/ext/commands/cooldowns.py @@ -23,20 +23,10 @@ DEALINGS IN THE SOFTWARE. """ -from __future__ import annotations +# Cooldowns were moved to discord/commands/cooldowns.py +# This file acts as an alias for now -import asyncio -import time -from collections import deque -from typing import TYPE_CHECKING, Any, Callable, Deque, TypeVar - -from discord.enums import Enum - -from ...abc import PrivateChannel -from .errors import MaxConcurrencyReached - -if TYPE_CHECKING: - from ...message import Message +from ...commands.cooldowns import * __all__ = ( "BucketType", @@ -45,358 +35,3 @@ "DynamicCooldownMapping", "MaxConcurrency", ) - -C = TypeVar("C", bound="CooldownMapping") -MC = TypeVar("MC", bound="MaxConcurrency") - - -class BucketType(Enum): - default = 0 - user = 1 - guild = 2 - channel = 3 - member = 4 - category = 5 - role = 6 - - def get_key(self, msg: Message) -> Any: - if self is BucketType.user: - return msg.author.id - elif self is BucketType.guild: - return (msg.guild or msg.author).id - elif self is BucketType.channel: - return msg.channel.id - elif self is BucketType.member: - return (msg.guild and msg.guild.id), msg.author.id - elif self is BucketType.category: - return (msg.channel.category or msg.channel).id # type: ignore - elif self is BucketType.role: - # we return the channel id of a private-channel as there are only roles in guilds - # and that yields the same result as for a guild with only the @everyone role - # NOTE: PrivateChannel doesn't actually have an id attribute, but we assume we are - # receiving a DMChannel or GroupChannel which inherit from PrivateChannel and do - return (msg.channel if isinstance(msg.channel, PrivateChannel) else msg.author.top_role).id # type: ignore - - def __call__(self, msg: Message) -> Any: - return self.get_key(msg) - - -class Cooldown: - """Represents a cooldown for a command. - - Attributes - ---------- - rate: :class:`int` - The total number of tokens available per :attr:`per` seconds. - per: :class:`float` - The length of the cooldown period in seconds. - """ - - __slots__ = ("rate", "per", "_window", "_tokens", "_last") - - def __init__(self, rate: float, per: float) -> None: - self.rate: int = int(rate) - self.per: float = float(per) - self._window: float = 0.0 - self._tokens: int = self.rate - self._last: float = 0.0 - - def get_tokens(self, current: float | None = None) -> int: - """Returns the number of available tokens before rate limiting is applied. - - Parameters - ---------- - current: Optional[:class:`float`] - The time in seconds since Unix epoch to calculate tokens at. - If not supplied then :func:`time.time()` is used. - - Returns - ------- - :class:`int` - The number of tokens available before the cooldown is to be applied. - """ - if not current: - current = time.time() - - tokens = self._tokens - - if current > self._window + self.per: - tokens = self.rate - return tokens - - def get_retry_after(self, current: float | None = None) -> float: - """Returns the time in seconds until the cooldown will be reset. - - Parameters - ---------- - current: Optional[:class:`float`] - The current time in seconds since Unix epoch. - If not supplied, then :func:`time.time()` is used. - - Returns - ------- - :class:`float` - The number of seconds to wait before this cooldown will be reset. - """ - current = current or time.time() - tokens = self.get_tokens(current) - - if tokens == 0: - return self.per - (current - self._window) - - return 0.0 - - def update_rate_limit(self, current: float | None = None) -> float | None: - """Updates the cooldown rate limit. - - Parameters - ---------- - current: Optional[:class:`float`] - The time in seconds since Unix epoch to update the rate limit at. - If not supplied, then :func:`time.time()` is used. - - Returns - ------- - Optional[:class:`float`] - The retry-after time in seconds if rate limited. - """ - current = current or time.time() - self._last = current - - self._tokens = self.get_tokens(current) - - # first token used means that we start a new rate limit window - if self._tokens == self.rate: - self._window = current - - # check if we are rate limited - if self._tokens == 0: - return self.per - (current - self._window) - - # we're not so decrement our tokens - self._tokens -= 1 - - def reset(self) -> None: - """Reset the cooldown to its initial state.""" - self._tokens = self.rate - self._last = 0.0 - - def copy(self) -> Cooldown: - """Creates a copy of this cooldown. - - Returns - ------- - :class:`Cooldown` - A new instance of this cooldown. - """ - return Cooldown(self.rate, self.per) - - def __repr__(self) -> str: - return ( - f"" - ) - - -class CooldownMapping: - def __init__( - self, - original: Cooldown | None, - type: Callable[[Message], Any], - ) -> None: - if not callable(type): - raise TypeError("Cooldown type must be a BucketType or callable") - - self._cache: dict[Any, Cooldown] = {} - self._cooldown: Cooldown | None = original - self._type: Callable[[Message], Any] = type - - def copy(self) -> CooldownMapping: - ret = CooldownMapping(self._cooldown, self._type) - ret._cache = self._cache.copy() - return ret - - @property - def valid(self) -> bool: - return self._cooldown is not None - - @property - def type(self) -> Callable[[Message], Any]: - return self._type - - @classmethod - def from_cooldown(cls: type[C], rate, per, type) -> C: - return cls(Cooldown(rate, per), type) - - def _bucket_key(self, msg: Message) -> Any: - return self._type(msg) - - def _verify_cache_integrity(self, current: float | None = None) -> None: - # we want to delete all cache objects that haven't been used - # in a cooldown window. e.g. if we have a command that has a - # cooldown of 60s, and it has not been used in 60s then that key should be deleted - current = current or time.time() - dead_keys = [k for k, v in self._cache.items() if current > v._last + v.per] - for k in dead_keys: - del self._cache[k] - - def create_bucket(self, message: Message) -> Cooldown: - return self._cooldown.copy() # type: ignore - - def get_bucket(self, message: Message, current: float | None = None) -> Cooldown: - if self._type is BucketType.default: - return self._cooldown # type: ignore - - self._verify_cache_integrity(current) - key = self._bucket_key(message) - if key not in self._cache: - bucket = self.create_bucket(message) - if bucket is not None: - self._cache[key] = bucket - else: - bucket = self._cache[key] - - return bucket - - def update_rate_limit( - self, message: Message, current: float | None = None - ) -> float | None: - bucket = self.get_bucket(message, current) - return bucket.update_rate_limit(current) - - -class DynamicCooldownMapping(CooldownMapping): - def __init__( - self, factory: Callable[[Message], Cooldown], type: Callable[[Message], Any] - ) -> None: - super().__init__(None, type) - self._factory: Callable[[Message], Cooldown] = factory - - def copy(self) -> DynamicCooldownMapping: - ret = DynamicCooldownMapping(self._factory, self._type) - ret._cache = self._cache.copy() - return ret - - @property - def valid(self) -> bool: - return True - - def create_bucket(self, message: Message) -> Cooldown: - return self._factory(message) - - -class _Semaphore: - """This class is a version of a semaphore. - - If you're wondering why asyncio.Semaphore isn't being used, - it's because it doesn't expose the internal value. This internal - value is necessary because I need to support both `wait=True` and - `wait=False`. - - An asyncio.Queue could have been used to do this as well -- but it is - not as efficient since internally that uses two queues and is a bit - overkill for what is basically a counter. - """ - - __slots__ = ("value", "loop", "_waiters") - - def __init__(self, number: int) -> None: - self.value: int = number - self.loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() - self._waiters: Deque[asyncio.Future] = deque() - - def __repr__(self) -> str: - return f"<_Semaphore value={self.value} waiters={len(self._waiters)}>" - - def locked(self) -> bool: - return self.value == 0 - - def is_active(self) -> bool: - return len(self._waiters) > 0 - - def wake_up(self) -> None: - while self._waiters: - future = self._waiters.popleft() - if not future.done(): - future.set_result(None) - return - - async def acquire(self, *, wait: bool = False) -> bool: - if not wait and self.value <= 0: - # signal that we're not acquiring - return False - - while self.value <= 0: - future = self.loop.create_future() - self._waiters.append(future) - try: - await future - except: - future.cancel() - if self.value > 0 and not future.cancelled(): - self.wake_up() - raise - - self.value -= 1 - return True - - def release(self) -> None: - self.value += 1 - self.wake_up() - - -class MaxConcurrency: - __slots__ = ("number", "per", "wait", "_mapping") - - def __init__(self, number: int, *, per: BucketType, wait: bool) -> None: - self._mapping: dict[Any, _Semaphore] = {} - self.per: BucketType = per - self.number: int = number - self.wait: bool = wait - - if number <= 0: - raise ValueError("max_concurrency 'number' cannot be less than 1") - - if not isinstance(per, BucketType): - raise TypeError( - f"max_concurrency 'per' must be of type BucketType not {type(per)!r}" - ) - - def copy(self: MC) -> MC: - return self.__class__(self.number, per=self.per, wait=self.wait) - - def __repr__(self) -> str: - return ( - f"" - ) - - def get_key(self, message: Message) -> Any: - return self.per.get_key(message) - - async def acquire(self, message: Message) -> None: - key = self.get_key(message) - - try: - sem = self._mapping[key] - except KeyError: - self._mapping[key] = sem = _Semaphore(self.number) - - acquired = await sem.acquire(wait=self.wait) - if not acquired: - raise MaxConcurrencyReached(self.number, self.per) - - async def release(self, message: Message) -> None: - # Technically there's no reason for this function to be async - # But it might be more useful in the future - key = self.get_key(message) - - try: - sem = self._mapping[key] - except KeyError: - # ...? peculiar - return - else: - sem.release() - - if sem.value >= self.number and not sem.is_active(): - del self._mapping[key] diff --git a/discord/ext/commands/core.py b/discord/ext/commands/core.py index c2282d28bf..3121b4b1a3 100644 --- a/discord/ext/commands/core.py +++ b/discord/ext/commands/core.py @@ -24,8 +24,6 @@ """ from __future__ import annotations -import asyncio -import datetime import functools import inspect import types @@ -42,18 +40,20 @@ ) import discord +from discord import utils from ...commands import ( ApplicationCommand, + BaseContext, Option, _BaseCommand, message_command, slash_command, user_command, ) +from ...commands.mixins import CogT, Invokable, hook_wrapped_callback, unwrap_function from ...enums import ChannelType from ...errors import * -from .cog import Cog from .context import Context from .converter import Greedy, get_converter, run_converters from .cooldowns import ( @@ -65,6 +65,8 @@ ) from .errors import * +T = TypeVar("T") + if TYPE_CHECKING: from typing_extensions import Concatenate, ParamSpec, TypeGuard @@ -72,6 +74,22 @@ from ._types import Check, Coro, CoroFunc, Error, Hook + P = ParamSpec("P") + + CommandT = TypeVar("CommandT", bound="Command") + ContextT = TypeVar("ContextT", bound="Context") + # CHT = TypeVar('CHT', bound='Check') + GroupT = TypeVar("GroupT", bound="Group") + HookT = TypeVar("HookT", bound="Hook") + ErrorT = TypeVar("ErrorT", bound="Error") + + CallbackT = Union[ + Callable[[Concatenate[CogT, ContextT, P]], Coro[T]], + Callable[[Concatenate[ContextT, P]], Coro[T]], + ] +else: + P = TypeVar("P") + __all__ = ( "Command", @@ -105,31 +123,6 @@ MISSING: Any = discord.utils.MISSING -T = TypeVar("T") -CogT = TypeVar("CogT", bound="Cog") -CommandT = TypeVar("CommandT", bound="Command") -ContextT = TypeVar("ContextT", bound="Context") -# CHT = TypeVar('CHT', bound='Check') -GroupT = TypeVar("GroupT", bound="Group") -HookT = TypeVar("HookT", bound="Hook") -ErrorT = TypeVar("ErrorT", bound="Error") - -if TYPE_CHECKING: - P = ParamSpec("P") -else: - P = TypeVar("P") - - -def unwrap_function(function: Callable[..., Any]) -> Callable[..., Any]: - partial = functools.partial - while True: - if hasattr(function, "__wrapped__"): - function = function.__wrapped__ - elif isinstance(function, partial): - function = function.func - else: - return function - def get_signature_parameters( function: Callable[..., Any], globalns: dict[str, Any] @@ -156,46 +149,6 @@ def get_signature_parameters( return params -def wrap_callback(coro): - @functools.wraps(coro) - async def wrapped(*args, **kwargs): - try: - ret = await coro(*args, **kwargs) - except CommandError: - raise - except asyncio.CancelledError: - return - except Exception as exc: - raise CommandInvokeError(exc) from exc - return ret - - return wrapped - - -def hooked_wrapped_callback(command, ctx, coro): - @functools.wraps(coro) - async def wrapped(*args, **kwargs): - try: - ret = await coro(*args, **kwargs) - except CommandError: - ctx.command_failed = True - raise - except asyncio.CancelledError: - ctx.command_failed = True - return - except Exception as exc: - ctx.command_failed = True - raise CommandInvokeError(exc) from exc - finally: - if command._max_concurrency is not None: - await command._max_concurrency.release(ctx) - - await command.call_after_hooks(ctx) - return ret - - return wrapped - - class _CaseInsensitiveDict(dict): def __contains__(self, k): return super().__contains__(k.casefold()) @@ -216,18 +169,16 @@ def __setitem__(self, k, v): super().__setitem__(k.casefold(), v) -class Command(_BaseCommand, Generic[CogT, P, T]): +class Command(Invokable, _BaseCommand, Generic[CogT, P, T]): r"""A class that implements the protocol for a bot text command. These are not created manually, instead they are created via the decorator or functional interface. + This is a subclass of :class:`Invokable`. + Attributes ----------- - name: :class:`str` - The name of the command. - callback: :ref:`coroutine ` - The coroutine that is executed when the command is called. help: Optional[:class:`str`] The long help text for the command. brief: Optional[:class:`str`] @@ -236,23 +187,8 @@ class Command(_BaseCommand, Generic[CogT, P, T]): A replacement for arguments in the default help text. aliases: Union[List[:class:`str`], Tuple[:class:`str`]] The list of aliases the command can be invoked under. - enabled: :class:`bool` - A boolean that indicates if the command is currently enabled. - If the command is invoked while it is disabled, then - :exc:`.DisabledCommand` is raised to the :func:`.on_command_error` - event. Defaults to ``True``. parent: Optional[:class:`Group`] - The parent group that this command belongs to. ``None`` if there - isn't one. - cog: Optional[:class:`Cog`] - The cog that this command belongs to. ``None`` if there isn't one. - checks: List[Callable[[:class:`.Context`], :class:`bool`]] - A list of predicates that verifies if the command could be executed - with the given :class:`.Context` as the sole parameter. If an exception - is necessary to be thrown to signal failure, then one inherited from - :exc:`.CommandError` should be used. Note that if the checks fail then - :exc:`.CheckFailure` exception is raised to the :func:`.on_command_error` - event. + The parent group that this command belongs to. description: :class:`str` The message prefixed into the default help command. hidden: :class:`bool` @@ -265,8 +201,6 @@ class Command(_BaseCommand, Generic[CogT, P, T]): regular matter rather than passing the rest completely raw. If ``True`` then the keyword-only argument will pass in the rest of the arguments in a completely raw matter. Defaults to ``False``. - invoked_subcommand: Optional[:class:`Command`] - The subcommand that was invoked, if any. require_var_positional: :class:`bool` If ``True`` and a variadic positional argument is specified, requires the user to specify at least one argument. Defaults to ``False``. @@ -278,26 +212,16 @@ class Command(_BaseCommand, Generic[CogT, P, T]): requirements are met (e.g. ``?foo a b c`` when only expecting ``a`` and ``b``). Otherwise :func:`.on_command_error` and local error handlers are called with :exc:`.TooManyArguments`. Defaults to ``True``. - cooldown_after_parsing: :class:`bool` - If ``True``\, cooldown processing is done after argument parsing, - which calls converters. If ``False`` then cooldown processing is done - first and then the converters are called second. Defaults to ``False``. extras: :class:`dict` A dict of user provided extras to attach to the Command. .. note:: This object may be copied by the library. - - .. versionadded:: 2.0 - - cooldown: Optional[:class:`Cooldown`] - The cooldown applied when the command is invoked. ``None`` if the command - doesn't have a cooldown. - .. versionadded:: 2.0 """ __original_kwargs__: dict[str, Any] + _callback: CallbackT def __new__(cls: type[CommandT], *args: Any, **kwargs: Any) -> CommandT: # if you're wondering why this is done, it's because we need to ensure @@ -317,24 +241,10 @@ def __new__(cls: type[CommandT], *args: Any, **kwargs: Any) -> CommandT: def __init__( self, - func: ( - Callable[Concatenate[CogT, ContextT, P], Coro[T]] - | Callable[Concatenate[ContextT, P], Coro[T]] - ), + func: CallbackT, **kwargs: Any, ): - if not asyncio.iscoroutinefunction(func): - raise TypeError("Callback must be a coroutine.") - - name = kwargs.get("name") or func.__name__ - if not isinstance(name, str): - raise TypeError("Name of a command must be a string.") - self.name: str = name - - self.callback = func - self.enabled: bool = kwargs.get("enabled", True) - - help_doc = kwargs.get("help") + help_doc = kwargs.pop("help", None) if help_doc is not None: help_doc = inspect.cleandoc(help_doc) else: @@ -343,95 +253,35 @@ def __init__( help_doc = help_doc.decode("utf-8") self.help: str | None = help_doc - - self.brief: str | None = kwargs.get("brief") - self.usage: str | None = kwargs.get("usage") - self.rest_is_raw: bool = kwargs.get("rest_is_raw", False) - self.aliases: list[str] | tuple[str] = kwargs.get("aliases", []) - self.extras: dict[str, Any] = kwargs.get("extras", {}) + self.brief: str | None = kwargs.pop("brief", None) + self.usage: str | None = kwargs.pop("usage", None) + self.rest_is_raw: bool = kwargs.pop("rest_is_raw", False) + self.aliases: list[str] | tuple[str] = kwargs.pop("aliases", []) + self.extras: dict[str, Any] = kwargs.pop("extras", {}) if not isinstance(self.aliases, (list, tuple)): raise TypeError( "Aliases of a command must be a list or a tuple of strings." ) - self.description: str = inspect.cleandoc(kwargs.get("description", "")) - self.hidden: bool = kwargs.get("hidden", False) - - try: - checks = func.__commands_checks__ - checks.reverse() - except AttributeError: - checks = kwargs.get("checks", []) + self.description: str = inspect.cleandoc(kwargs.pop("description", "")) + self.hidden: bool = kwargs.pop("hidden", False) - self.checks: list[Check] = checks - - try: - cooldown = func.__commands_cooldown__ - except AttributeError: - cooldown = kwargs.get("cooldown") - - if cooldown is None: - buckets = CooldownMapping(cooldown, BucketType.default) - elif isinstance(cooldown, CooldownMapping): - buckets = cooldown - else: - raise TypeError( - "Cooldown must be a an instance of CooldownMapping or None." - ) - self._buckets: CooldownMapping = buckets - - try: - max_concurrency = func.__commands_max_concurrency__ - except AttributeError: - max_concurrency = kwargs.get("max_concurrency") - - self._max_concurrency: MaxConcurrency | None = max_concurrency - - self.require_var_positional: bool = kwargs.get("require_var_positional", False) - self.ignore_extra: bool = kwargs.get("ignore_extra", True) - self.cooldown_after_parsing: bool = kwargs.get("cooldown_after_parsing", False) - self.cog: CogT | None = None - - # bandaid for the fact that sometimes parent can be the bot instance - parent = kwargs.get("parent") - self.parent: GroupMixin | None = parent if isinstance(parent, _BaseCommand) else None # type: ignore - - self._before_invoke: Hook | None = None - try: - before_invoke = func.__before_invoke__ - except AttributeError: - pass - else: - self.before_invoke(before_invoke) + self.require_var_positional: bool = kwargs.pop("require_var_positional", False) + self.ignore_extra: bool = kwargs.pop("ignore_extra", True) - self._after_invoke: Hook | None = None - try: - after_invoke = func.__after_invoke__ - except AttributeError: - pass - else: - self.after_invoke(after_invoke) + super().__init__(func, **kwargs) @property def callback( self, - ) -> ( - Callable[Concatenate[CogT, Context, P], Coro[T]] - | Callable[Concatenate[Context, P], Coro[T]] - ): + ) -> CallbackT: return self._callback @callback.setter - def callback( - self, - function: ( - Callable[Concatenate[CogT, Context, P], Coro[T]] - | Callable[Concatenate[Context, P], Coro[T]] - ), - ) -> None: - self._callback = function - unwrap = unwrap_function(function) + def callback(self, func: CallbackT) -> None: + self._callback = func + unwrap = unwrap_function(func) self.module = unwrap.__module__ try: @@ -439,128 +289,10 @@ def callback( except AttributeError: globalns = {} - self.params = get_signature_parameters(function, globalns) - - def add_check(self, func: Check) -> None: - """Adds a check to the command. - - This is the non-decorator interface to :func:`.check`. - - .. versionadded:: 1.3 - - Parameters - ---------- - func - The function that will be used as a check. - """ - - self.checks.append(func) - - def remove_check(self, func: Check) -> None: - """Removes a check from the command. + self.params = get_signature_parameters(func, globalns) - This function is idempotent and will not raise an exception - if the function is not in the command's checks. - - .. versionadded:: 1.3 - - Parameters - ---------- - func - The function to remove from the checks. - """ - - try: - self.checks.remove(func) - except ValueError: - pass - - def update(self, **kwargs: Any) -> None: - """Updates :class:`Command` instance with updated attribute. - - This works similarly to the :func:`.command` decorator in terms - of parameters in that they are passed to the :class:`Command` or - subclass constructors, sans the name and callback. - """ - self.__init__(self.callback, **dict(self.__original_kwargs__, **kwargs)) - - async def __call__(self, context: Context, *args: P.args, **kwargs: P.kwargs) -> T: - """|coro| - - Calls the internal callback that the command holds. - - .. note:: - - This bypasses all mechanisms -- including checks, converters, - invoke hooks, cooldowns, etc. You must take care to pass - the proper arguments and types to this function. - - .. versionadded:: 1.3 - """ - if self.cog is not None: - return await self.callback(self.cog, context, *args, **kwargs) # type: ignore - else: - return await self.callback(context, *args, **kwargs) # type: ignore - - def _ensure_assignment_on_copy(self, other: CommandT) -> CommandT: - other._before_invoke = self._before_invoke - other._after_invoke = self._after_invoke - if self.checks != other.checks: - other.checks = self.checks.copy() - if self._buckets.valid and not other._buckets.valid: - other._buckets = self._buckets.copy() - if self._max_concurrency != other._max_concurrency: - # _max_concurrency won't be None at this point - other._max_concurrency = self._max_concurrency.copy() # type: ignore - - try: - other.on_error = self.on_error - except AttributeError: - pass - return other - - def copy(self: CommandT) -> CommandT: - """Creates a copy of this command. - - Returns - ------- - :class:`Command` - A new instance of this command. - """ - ret = self.__class__(self.callback, **self.__original_kwargs__) - return self._ensure_assignment_on_copy(ret) - - def _update_copy(self: CommandT, kwargs: dict[str, Any]) -> CommandT: - if kwargs: - kw = kwargs.copy() - kw.update(self.__original_kwargs__) - copy = self.__class__(self.callback, **kw) - return self._ensure_assignment_on_copy(copy) - else: - return self.copy() - - async def dispatch_error(self, ctx: Context, error: Exception) -> None: - ctx.command_failed = True - cog = self.cog - try: - coro = self.on_error - except AttributeError: - pass - else: - injected = wrap_callback(coro) - if cog is not None: - await injected(cog, ctx, error) - else: - await injected(ctx, error) - - try: - if cog is not None: - local = Cog._get_overridden_method(cog.cog_command_error) - if local is not None: - wrapped = wrap_callback(local) - await wrapped(ctx, error) - finally: - ctx.bot.dispatch("command_error", ctx, error) + async def _dispatch_error(self, ctx: Context, error: Exception) -> None: + ctx.bot.dispatch("command_error", ctx, error) async def transform(self, ctx: Context, param: inspect.Parameter) -> Any: if isinstance(param.annotation, Option): @@ -684,70 +416,6 @@ def clean_params(self) -> dict[str, inspect.Parameter]: return result - @property - def full_parent_name(self) -> str: - """Retrieves the fully qualified parent command name. - - This the base command name required to execute it. For example, - in ``?one two three`` the parent name would be ``one two``. - """ - entries = [] - command = self - # command.parent is type-hinted as GroupMixin some attributes are resolved via MRO - while command.parent is not None: # type: ignore - command = command.parent # type: ignore - entries.append(command.name) # type: ignore - - return " ".join(reversed(entries)) - - @property - def parents(self) -> list[Group]: - """Retrieves the parents of this command. - - If the command has no parents then it returns an empty :class:`list`. - - For example in commands ``?a b c test``, the parents are ``[c, b, a]``. - - .. versionadded:: 1.1 - """ - entries = [] - command = self - while command.parent is not None: # type: ignore - command = command.parent # type: ignore - entries.append(command) - - return entries - - @property - def root_parent(self) -> Group | None: - """Retrieves the root parent of this command. - - If the command has no parents then it returns ``None``. - - For example in commands ``?a b c test``, the root parent is ``a``. - """ - if not self.parent: - return None - return self.parents[-1] - - @property - def qualified_name(self) -> str: - """Retrieves the fully qualified command name. - - This is the full parent name with the command name as well. - For example, in ``?one two three`` the qualified name would be - ``one two three``. - """ - - parent = self.full_parent_name - if parent: - return f"{parent} {self.name}" - else: - return self.name - - def __str__(self) -> str: - return self.qualified_name - async def _parse_arguments(self, ctx: Context) -> None: ctx.args = [ctx] if self.cog is None else [self.cog, ctx] ctx.kwargs = {} @@ -804,265 +472,6 @@ async def _parse_arguments(self, ctx: Context) -> None: f"Too many arguments passed to {self.qualified_name}" ) - async def call_before_hooks(self, ctx: Context) -> None: - # now that we're done preparing we can call the pre-command hooks - # first, call the command local hook: - cog = self.cog - if self._before_invoke is not None: - # should be cog if @commands.before_invoke is used - instance = getattr(self._before_invoke, "__self__", cog) - # __self__ only exists for methods, not functions - # however, if @command.before_invoke is used, it will be a function - if instance: - await self._before_invoke(instance, ctx) # type: ignore - else: - await self._before_invoke(ctx) # type: ignore - - # call the cog local hook if applicable: - if cog is not None: - hook = Cog._get_overridden_method(cog.cog_before_invoke) - if hook is not None: - await hook(ctx) - - # call the bot global hook if necessary - hook = ctx.bot._before_invoke - if hook is not None: - await hook(ctx) - - async def call_after_hooks(self, ctx: Context) -> None: - cog = self.cog - if self._after_invoke is not None: - instance = getattr(self._after_invoke, "__self__", cog) - if instance: - await self._after_invoke(instance, ctx) # type: ignore - else: - await self._after_invoke(ctx) # type: ignore - - # call the cog local hook if applicable: - if cog is not None: - hook = Cog._get_overridden_method(cog.cog_after_invoke) - if hook is not None: - await hook(ctx) - - hook = ctx.bot._after_invoke - if hook is not None: - await hook(ctx) - - def _prepare_cooldowns(self, ctx: Context) -> None: - if self._buckets.valid: - dt = ctx.message.edited_at or ctx.message.created_at - current = dt.replace(tzinfo=datetime.timezone.utc).timestamp() - bucket = self._buckets.get_bucket(ctx.message, current) - if bucket is not None: - retry_after = bucket.update_rate_limit(current) - if retry_after: - raise CommandOnCooldown(bucket, retry_after, self._buckets.type) # type: ignore - - async def prepare(self, ctx: Context) -> None: - ctx.command = self - - if not await self.can_run(ctx): - raise CheckFailure( - f"The check functions for command {self.qualified_name} failed." - ) - - if self._max_concurrency is not None: - # For this application, context can be duck-typed as a Message - await self._max_concurrency.acquire(ctx) # type: ignore - - try: - if self.cooldown_after_parsing: - await self._parse_arguments(ctx) - self._prepare_cooldowns(ctx) - else: - self._prepare_cooldowns(ctx) - await self._parse_arguments(ctx) - - await self.call_before_hooks(ctx) - except: - if self._max_concurrency is not None: - await self._max_concurrency.release(ctx) # type: ignore - raise - - @property - def cooldown(self) -> Cooldown | None: - return self._buckets._cooldown - - def is_on_cooldown(self, ctx: Context) -> bool: - """Checks whether the command is currently on cooldown. - - Parameters - ---------- - ctx: :class:`.Context` - The invocation context to use when checking the command's cooldown status. - - Returns - ------- - :class:`bool` - A boolean indicating if the command is on cooldown. - """ - if not self._buckets.valid: - return False - - bucket = self._buckets.get_bucket(ctx.message) - dt = ctx.message.edited_at or ctx.message.created_at - current = dt.replace(tzinfo=datetime.timezone.utc).timestamp() - return bucket.get_tokens(current) == 0 - - def reset_cooldown(self, ctx: Context) -> None: - """Resets the cooldown on this command. - - Parameters - ---------- - ctx: :class:`.Context` - The invocation context to reset the cooldown under. - """ - if self._buckets.valid: - bucket = self._buckets.get_bucket(ctx.message) - bucket.reset() - - def get_cooldown_retry_after(self, ctx: Context) -> float: - """Retrieves the amount of seconds before this command can be tried again. - - .. versionadded:: 1.4 - - Parameters - ---------- - ctx: :class:`.Context` - The invocation context to retrieve the cooldown from. - - Returns - ------- - :class:`float` - The amount of time left on this command's cooldown in seconds. - If this is ``0.0`` then the command isn't on cooldown. - """ - if self._buckets.valid: - bucket = self._buckets.get_bucket(ctx.message) - dt = ctx.message.edited_at or ctx.message.created_at - current = dt.replace(tzinfo=datetime.timezone.utc).timestamp() - return bucket.get_retry_after(current) - - return 0.0 - - async def invoke(self, ctx: Context) -> None: - await self.prepare(ctx) - - # terminate the invoked_subcommand chain. - # since we're in a regular command (and not a group) then - # the invoked subcommand is None. - ctx.invoked_subcommand = None - ctx.subcommand_passed = None - injected = hooked_wrapped_callback(self, ctx, self.callback) - await injected(*ctx.args, **ctx.kwargs) - - async def reinvoke(self, ctx: Context, *, call_hooks: bool = False) -> None: - ctx.command = self - await self._parse_arguments(ctx) - - if call_hooks: - await self.call_before_hooks(ctx) - - ctx.invoked_subcommand = None - try: - await self.callback(*ctx.args, **ctx.kwargs) # type: ignore - except: - ctx.command_failed = True - raise - finally: - if call_hooks: - await self.call_after_hooks(ctx) - - def error(self, coro: ErrorT) -> ErrorT: - """A decorator that registers a coroutine as a local error handler. - - A local error handler is an :func:`.on_command_error` event limited to - a single command. However, the :func:`.on_command_error` is still - invoked afterwards as the catch-all. - - Parameters - ---------- - coro: :ref:`coroutine ` - The coroutine to register as the local error handler. - - Raises - ------ - TypeError - The coroutine passed is not actually a coroutine. - """ - - if not asyncio.iscoroutinefunction(coro): - raise TypeError("The error handler must be a coroutine.") - - self.on_error: Error = coro - return coro - - def has_error_handler(self) -> bool: - """Checks whether the command has an error handler registered. - - .. versionadded:: 1.7 - """ - return hasattr(self, "on_error") - - def before_invoke(self, coro: HookT) -> HookT: - """A decorator that registers a coroutine as a pre-invoke hook. - - A pre-invoke hook is called directly before the command is - called. This makes it a useful function to set up database - connections or any type of set up required. - - This pre-invoke hook takes a sole parameter, a :class:`.Context`. - - See :meth:`.Bot.before_invoke` for more info. - - Parameters - ---------- - coro: :ref:`coroutine ` - The coroutine to register as the pre-invoke hook. - - Raises - ------ - TypeError - The coroutine passed is not actually a coroutine. - """ - if not asyncio.iscoroutinefunction(coro): - raise TypeError("The pre-invoke hook must be a coroutine.") - - self._before_invoke = coro - return coro - - def after_invoke(self, coro: HookT) -> HookT: - """A decorator that registers a coroutine as a post-invoke hook. - - A post-invoke hook is called directly after the command is - called. This makes it a useful function to clean-up database - connections or any type of clean up required. - - This post-invoke hook takes a sole parameter, a :class:`.Context`. - - See :meth:`.Bot.after_invoke` for more info. - - Parameters - ---------- - coro: :ref:`coroutine ` - The coroutine to register as the post-invoke hook. - - Raises - ------ - TypeError - The coroutine passed is not actually a coroutine. - """ - if not asyncio.iscoroutinefunction(coro): - raise TypeError("The post-invoke hook must be a coroutine.") - - self._after_invoke = coro - return coro - - @property - def cog_name(self) -> str | None: - """The name of the cog this command belongs to, if any.""" - return type(self.cog).__cog_name__ if self.cog is not None else None - @property def short_doc(self) -> str: """Gets the "short" documentation of a command. @@ -1185,30 +594,25 @@ async def can_run(self, ctx: Context) -> bool: try: if not await ctx.bot.can_run(ctx): raise CheckFailure( - "The global check functions for command" - f" {self.qualified_name} failed." + f"The global check functions for command {self.qualified_name} failed." ) - cog = self.cog - if cog is not None: - local_check = Cog._get_overridden_method(cog.cog_check) - if local_check is not None: - ret = await discord.utils.maybe_coroutine(local_check, ctx) - if not ret: - return False + if (cog := self.cog) and ( + local_check := cog._get_overridden_method(cog.cog_check) + ): + ret = await utils.maybe_coroutine(local_check, ctx) + if not ret: + return False predicates = self.checks if not predicates: # since we have no checks, then we just return True. return True - return await discord.utils.async_all(predicate(ctx) for predicate in predicates) # type: ignore + return await utils.async_all(predicate(ctx) for predicate in predicates) finally: ctx.command = original - def _set_cog(self, cog): - self.cog = cog - class GroupMixin(Generic[CogT]): """A mixin that implements common functionality for classes that behave @@ -1256,7 +660,7 @@ def add_command(self, command: Command[CogT, Any, Any]) -> None: :meth:`~.GroupMixin.group` shortcut decorators are used instead. .. versionchanged:: 1.4 - Raise :exc:`.CommandRegistrationError` instead of generic :exc:`.ClientException` + Raise :exc:`.CommandRegistrationError` instead of generic :exc:`.ClientException` Parameters ---------- @@ -1512,7 +916,7 @@ def __init__(self, *args: Any, **attrs: Any) -> None: self.invoke_without_command: bool = attrs.pop("invoke_without_command", False) super().__init__(*args, **attrs) - def copy(self: GroupT) -> GroupT: + def copy(self: GroupT, overrides: dict[str, Any] | None = None) -> GroupT: """Creates a copy of this :class:`Group`. Returns @@ -1520,7 +924,8 @@ def copy(self: GroupT) -> GroupT: :class:`Group` A new instance of this group. """ - ret = super().copy() + ret = super().copy(overrides) + ret.recursively_remove_all_commands() for cmd in self.commands: ret.add_command(cmd.copy()) return ret # type: ignore @@ -1542,7 +947,7 @@ async def invoke(self, ctx: Context) -> None: ctx.invoked_subcommand = self.prefixed_commands.get(trigger, None) if early_invoke: - injected = hooked_wrapped_callback(self, ctx, self.callback) + injected = hook_wrapped_callback(self, ctx, self.callback) await injected(*ctx.args, **ctx.kwargs) ctx.invoked_parents.append(ctx.invoked_with) # type: ignore @@ -1602,7 +1007,6 @@ async def reinvoke(self, ctx: Context, *, call_hooks: bool = False) -> None: @overload # for py 3.10 def command( - name: str = ..., cls: type[Command[CogT, P, T]] = ..., **attrs: Any, ) -> Callable[ @@ -1621,7 +1025,6 @@ def command( @overload def command( - name: str = ..., cls: type[Command[CogT, P, T]] = ..., **attrs: Any, ) -> Callable[ @@ -1638,7 +1041,6 @@ def command( @overload def command( - name: str = ..., cls: type[CommandT] = ..., **attrs: Any, ) -> Callable[ @@ -1654,7 +1056,7 @@ def command( def command( - name: str = MISSING, cls: type[CommandT] = MISSING, **attrs: Any + cls: type[CommandT] = MISSING, **attrs: Any ) -> Callable[ [ ( @@ -1664,7 +1066,7 @@ def command( ], Command[CogT, P, T] | CommandT, ]: - """A decorator that transforms a function into a :class:`.Command` + r"""A decorator that transforms a function into a :class:`.Command` or if called with :func:`.group`, :class:`.Group`. By default the ``help`` attribute is received automatically from the @@ -1678,13 +1080,10 @@ def command( Parameters ---------- - name: :class:`str` - The name to create the command with. By default, this uses the - function name unchanged. cls The class to construct with. By default, this is :class:`.Command`. You usually do not change this. - attrs + \*\*attrs Keyword arguments to pass into the construction of the class denoted by ``cls``. @@ -1704,14 +1103,13 @@ def decorator( ) -> CommandT: if isinstance(func, Command): raise TypeError("Callback is already a command.") - return cls(func, name=name, **attrs) + return cls(func, **attrs) return decorator @overload def group( - name: str = ..., cls: type[Group[CogT, P, T]] = ..., **attrs: Any, ) -> Callable[ @@ -1728,7 +1126,6 @@ def group( @overload def group( - name: str = ..., cls: type[GroupT] = ..., **attrs: Any, ) -> Callable[ @@ -1744,7 +1141,6 @@ def group( def group( - name: str = MISSING, cls: type[GroupT] = MISSING, **attrs: Any, ) -> Callable[ @@ -1766,7 +1162,7 @@ def group( """ if cls is MISSING: cls = Group # type: ignore - return command(name=name, cls=cls, **attrs) # type: ignore + return command(cls=cls, **attrs) # type: ignore def check(predicate: Check) -> Callable[[T], T]: diff --git a/discord/ext/commands/errors.py b/discord/ext/commands/errors.py index c9174054d8..9f37a8205a 100644 --- a/discord/ext/commands/errors.py +++ b/discord/ext/commands/errors.py @@ -27,7 +27,16 @@ from typing import TYPE_CHECKING, Any, Callable -from discord.errors import ClientException, DiscordException +from discord.errors import ( + CheckFailure, + ClientException, + CommandError, + CommandInvokeError, + CommandOnCooldown, + DisabledCommand, + MaxConcurrencyReached, + UserInputError, +) if TYPE_CHECKING: from inspect import Parameter @@ -38,7 +47,6 @@ from .context import Context from .converter import Converter - from .cooldowns import BucketType, Cooldown from .flags import Flag @@ -97,27 +105,6 @@ ) -class CommandError(DiscordException): - r"""The base exception type for all command related errors. - - This inherits from :exc:`discord.DiscordException`. - - This exception and exceptions inherited from it are handled - in a special way as they are caught and passed into a special event - from :class:`.Bot`\, :func:`.on_command_error`. - """ - - def __init__(self, message: str | None = None, *args: Any) -> None: - if message is not None: - # clean-up @everyone and @here mentions - m = message.replace("@everyone", "@\u200beveryone").replace( - "@here", "@\u200bhere" - ) - super().__init__(m, *args) - else: - super().__init__(*args) - - class ConversionError(CommandError): """Exception raised when a Converter class raises non-CommandError. @@ -137,14 +124,6 @@ def __init__(self, converter: Converter, original: Exception) -> None: self.original: Exception = original -class UserInputError(CommandError): - """The base exception type for errors that involve errors - regarding user input. - - This inherits from :exc:`CommandError`. - """ - - class CommandNotFound(CommandError): """Exception raised when a command is attempted to be invoked but no command under that name is found. @@ -189,13 +168,6 @@ class BadArgument(UserInputError): """ -class CheckFailure(CommandError): - """Exception raised when the predicates in :attr:`.Command.checks` have failed. - - This inherits from :exc:`CommandError` - """ - - class CheckAnyFailure(CheckFailure): """Exception raised when all predicates in :func:`check_any` fail. @@ -523,81 +495,6 @@ def __init__(self, argument: str) -> None: super().__init__(f"{argument} is not a recognised boolean option") -class DisabledCommand(CommandError): - """Exception raised when the command being invoked is disabled. - - This inherits from :exc:`CommandError` - """ - - -class CommandInvokeError(CommandError): - """Exception raised when the command being invoked raised an exception. - - This inherits from :exc:`CommandError` - - Attributes - ---------- - original: :exc:`Exception` - The original exception that was raised. You can also get this via - the ``__cause__`` attribute. - """ - - def __init__(self, e: Exception) -> None: - self.original: Exception = e - super().__init__(f"Command raised an exception: {e.__class__.__name__}: {e}") - - -class CommandOnCooldown(CommandError): - """Exception raised when the command being invoked is on cooldown. - - This inherits from :exc:`CommandError` - - Attributes - ---------- - cooldown: :class:`.Cooldown` - A class with attributes ``rate`` and ``per`` similar to the - :func:`.cooldown` decorator. - type: :class:`BucketType` - The type associated with the cooldown. - retry_after: :class:`float` - The amount of seconds to wait before you can retry again. - """ - - def __init__( - self, cooldown: Cooldown, retry_after: float, type: BucketType - ) -> None: - self.cooldown: Cooldown = cooldown - self.retry_after: float = retry_after - self.type: BucketType = type - super().__init__(f"You are on cooldown. Try again in {retry_after:.2f}s") - - -class MaxConcurrencyReached(CommandError): - """Exception raised when the command being invoked has reached its maximum concurrency. - - This inherits from :exc:`CommandError`. - - Attributes - ---------- - number: :class:`int` - The maximum number of concurrent invokers allowed. - per: :class:`.BucketType` - The bucket type passed to the :func:`.max_concurrency` decorator. - """ - - def __init__(self, number: int, per: BucketType) -> None: - self.number: int = number - self.per: BucketType = per - name = per.name - suffix = f"per {name}" if per.name != "default" else "globally" - plural = "%s times %s" if number > 1 else "%s time %s" - fmt = plural % (number, suffix) - super().__init__( - "Too many people are using this command. It can only be used" - f" {fmt} concurrently." - ) - - class MissingRole(CheckFailure): """Exception raised when the command invoker lacks a role to run a command. diff --git a/discord/scheduled_events.py b/discord/scheduled_events.py index f7c0f898d9..944c3b6eab 100644 --- a/discord/scheduled_events.py +++ b/discord/scheduled_events.py @@ -35,7 +35,6 @@ ScheduledEventStatus, try_enum, ) -from .errors import ValidationError from .iterators import ScheduledEventSubscribersIterator from .mixins import Hashable from .object import Object @@ -365,7 +364,7 @@ async def edit( if end_time is MISSING and location.type is ScheduledEventLocationType.external: end_time = self.end_time if end_time is None: - raise ValidationError( + raise TypeError( "end_time needs to be passed if location type is external." ) diff --git a/docs/api/application_commands.rst b/docs/api/application_commands.rst index 2fe39f0760..c1998b98ee 100644 --- a/docs/api/application_commands.rst +++ b/docs/api/application_commands.rst @@ -41,6 +41,14 @@ Shortcut Decorators Objects ~~~~~~~ +.. attributetable:: Invokable +.. autoclass:: Invokable + :members: + +.. attributetable:: BaseContext +.. autoclass:: BaseContext + :members: + .. attributetable:: ApplicationCommand .. autoclass:: ApplicationCommand :members: @@ -53,6 +61,10 @@ Objects .. autoclass:: SlashCommandGroup :members: +.. attributetable:: ContextMenuCommand +.. autoclass:: ContextMenuCommand + :members: + .. attributetable:: UserCommand .. autoclass:: UserCommand :members: diff --git a/docs/api/exceptions.rst b/docs/api/exceptions.rst index f5586f162d..cc0485018b 100644 --- a/docs/api/exceptions.rst +++ b/docs/api/exceptions.rst @@ -23,9 +23,15 @@ Exception Hierarchy - :exc:`Forbidden` - :exc:`NotFound` - :exc:`DiscordServerError` - - :exc:`ApplicationCommandError` + - :exc:`CommandError` + - :exc:`ApplicationCommandError` + - :exc:`ApplicationCommandInvokeError` + - :exc:`CommandInvokeError` - :exc:`CheckFailure` - - :exc:`ApplicationCommandInvokeError` + - :exc:`MaxConcurrencyReached` + - :exc:`CommandOnCooldown` + - :exc:`DisabledCommand` + - :exc:`UserInputError` - :exc:`ExtensionError` - :exc:`ExtensionAlreadyLoaded` - :exc:`ExtensionNotLoaded` @@ -80,13 +86,31 @@ The following exceptions are thrown by the library. .. autoexception:: discord.opus.OpusNotLoaded +.. autoexception:: discord.CommandError + :members: + .. autoexception:: discord.ApplicationCommandError :members: +.. autoexception:: discord.ApplicationCommandInvokeError + :members: + .. autoexception:: discord.CheckFailure :members: -.. autoexception:: discord.ApplicationCommandInvokeError +.. autoexception:: discord.DisabledCommand + :members: + +.. autoexception:: discord.CommandInvokeError + :members: + +.. autoexception:: discord.UserInputError + :members: + +.. autoexception:: discord.CommandOnCooldown + :members: + +.. autoexception:: discord.MaxConcurrencyReached :members: .. autoexception:: discord.ExtensionError diff --git a/docs/ext/commands/api.rst b/docs/ext/commands/api.rst index 2d9af7f378..1031257493 100644 --- a/docs/ext/commands/api.rst +++ b/docs/ext/commands/api.rst @@ -486,33 +486,15 @@ Exceptions .. autoexception:: discord.ext.commands.NoPrivateMessage :members: -.. autoexception:: discord.ext.commands.CheckFailure - :members: - .. autoexception:: discord.ext.commands.CheckAnyFailure :members: .. autoexception:: discord.ext.commands.CommandNotFound :members: -.. autoexception:: discord.ext.commands.DisabledCommand - :members: - -.. autoexception:: discord.ext.commands.CommandInvokeError - :members: - .. autoexception:: discord.ext.commands.TooManyArguments :members: -.. autoexception:: discord.ext.commands.UserInputError - :members: - -.. autoexception:: discord.ext.commands.CommandOnCooldown - :members: - -.. autoexception:: discord.ext.commands.MaxConcurrencyReached - :members: - .. autoexception:: discord.ext.commands.NotOwner :members: @@ -604,9 +586,9 @@ Exception Hierarchy .. exception_hierarchy:: - :exc:`~.DiscordException` - - :exc:`~.commands.CommandError` + - :exc:`~.CommandError` - :exc:`~.commands.ConversionError` - - :exc:`~.commands.UserInputError` + - :exc:`~.UserInputError` - :exc:`~.commands.MissingRequiredArgument` - :exc:`~.commands.TooManyArguments` - :exc:`~.commands.BadArgument` @@ -636,7 +618,7 @@ Exception Hierarchy - :exc:`~.commands.InvalidEndOfQuotedStringError` - :exc:`~.commands.ExpectedClosingQuoteError` - :exc:`~.commands.CommandNotFound` - - :exc:`~.commands.CheckFailure` + - :exc:`~.CheckFailure` - :exc:`~.commands.CheckAnyFailure` - :exc:`~.commands.PrivateMessageOnly` - :exc:`~.commands.NoPrivateMessage` @@ -648,9 +630,9 @@ Exception Hierarchy - :exc:`~.commands.MissingAnyRole` - :exc:`~.commands.BotMissingAnyRole` - :exc:`~.commands.NSFWChannelRequired` - - :exc:`~.commands.DisabledCommand` - - :exc:`~.commands.CommandInvokeError` - - :exc:`~.commands.CommandOnCooldown` - - :exc:`~.commands.MaxConcurrencyReached` + - :exc:`~.DisabledCommand` + - :exc:`~.CommandInvokeError` + - :exc:`~.CommandOnCooldown` + - :exc:`~.MaxConcurrencyReached` - :exc:`~.ClientException` - :exc:`~.commands.CommandRegistrationError`