Skip to content

Commit

Permalink
refactor!(webhooks): revert rate limit prediction implementation
Browse files Browse the repository at this point in the history
Webhook rate limits are too unpredictable.
  • Loading branch information
VincentRPS committed Jan 3, 2024
1 parent 6a2aefe commit 4b9a3fc
Showing 1 changed file with 25 additions and 40 deletions.
65 changes: 25 additions & 40 deletions discord/webhook/async_.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import json
import logging
import re
import weakref
from contextvars import ContextVar
from typing import TYPE_CHECKING, Any, Literal, NamedTuple, overload
from urllib.parse import quote as urlquote
Expand All @@ -51,7 +52,6 @@
from ..message import Attachment, Message
from ..mixins import Hashable
from ..object import Object
from ..rate_limiting import DynamicBucket
from ..threads import Thread
from ..user import BaseUser, User

Expand Down Expand Up @@ -101,6 +101,9 @@ async def __aexit__(self, type, value, traceback):


class AsyncWebhookAdapter:
def __init__(self):
self._locks: weakref.WeakValueDictionary = weakref.WeakValueDictionary()

async def request(
self,
route: Route,
Expand All @@ -118,7 +121,12 @@ async def request(
headers: dict[str, str] = {}
files = files or []
to_send: str | aiohttp.FormData | None = None
bucket_id = f"{route.webhook_id}:{route.webhook_token}"
bucket = (route.webhook_id, route.webhook_token)

try:
lock = self._locks[bucket]
except KeyError:
self._locks[bucket] = lock = asyncio.Lock()

if payload is not None:
headers["Content-Type"] = "application/json"
Expand All @@ -136,12 +144,7 @@ async def request(
url = route.url
webhook_id = route.webhook_id

tbucket = await self._rate_limit.temp_bucket(bucket_id)

if tbucket:
await tbucket.wait()

async with self._rate_limit.webhook_global_concurrency:
async with AsyncDeferredLock(lock) as lock:
for attempt in range(5):
for file in files:
file.reset(seek=attempt)
Expand Down Expand Up @@ -176,12 +179,17 @@ async def request(
data = json.loads(data)

remaining = response.headers.get("X-Ratelimit-Remaining")
reset_after = response.headers.get("X-Ratelimit-Reset-After")

if remaining:
remaining = int(remaining)
if reset_after:
reset_after = float(reset_after)
if remaining == "0" and response.status != 429:
delta = utils._parse_ratelimit_header(response)
_log.debug(
(
"Webhook ID %s has been pre-emptively rate limited,"
" waiting %.2f seconds"
),
webhook_id,
delta,
)
lock.delay_by(delta)

if 300 > response.status >= 200:
return data
Expand All @@ -199,30 +207,7 @@ async def request(
webhook_id,
retry_after,
)

retry_after: float = data["retry_after"]
is_global: bool = data.get("global", False)

if is_global:
self.global_dynamo = DynamicBucket()
await self.global_dynamo.executed(
retry_after,
remaining or 10,
is_global=is_global,
)
self.global_dynamo = None
else:
tbucket = DynamicBucket()
await self._rate_limit.push_temp_bucket(
bucket_id, tbucket
)
await tbucket.executed(
retry_after, remaining or 10, is_global=True
)
await self._rate_limit.pop_temp_bucket(bucket_id)

_log.debug("Done sleeping for the rate limit. Retrying...")

await asyncio.sleep(retry_after)
continue

if response.status >= 500:
Expand Down Expand Up @@ -1161,7 +1146,7 @@ async def foo():
.. versionadded:: 2.0
"""

__slots__: tuple[str, ...] = ("session", "proxy", "proxy_auth", "_bucket_storage")
__slots__: tuple[str, ...] = ("session", "proxy", "proxy_auth")

def __init__(
self,
Expand Down Expand Up @@ -2036,4 +2021,4 @@ async def delete_message(
proxy=self.proxy,
proxy_auth=self.proxy_auth,
thread_id=thread_id,
)
)

0 comments on commit 4b9a3fc

Please sign in to comment.