Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] async credential fetch support #834

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion opensearchpy/connection/http_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@


import asyncio
import inspect
import os
import ssl
import warnings
Expand Down Expand Up @@ -196,10 +197,15 @@ async def perform_request(
auth = (
self._http_auth if isinstance(self._http_auth, aiohttp.BasicAuth) else None
)

if callable(self._http_auth):
http_auth_result = self._http_auth(method, url, query_string, body)
if inspect.isawaitable(http_auth_result):
http_auth_result = await http_auth_result

req_headers = {
**req_headers,
**self._http_auth(method, url, query_string, body),
**http_auth_result,
}

start = self.loop.time()
Expand Down
31 changes: 20 additions & 11 deletions opensearchpy/helpers/asyncsigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,22 @@
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.
import inspect
from typing import Dict, Optional, Union, TYPE_CHECKING

from typing import Any, Dict, Optional, Union
if TYPE_CHECKING:
from botocore.credentials import Credentials, RefreshableCredentials
from aiobotocore.credentials import AioCredentials, AioRefreshableCredentials

CredentialTypes = Credentials | RefreshableCredentials | AioCredentials | AioRefreshableCredentials


class AWSV4SignerAsyncAuth:
"""
AWS V4 Request Signer for Async Requests.
"""

def __init__(self, credentials: Any, region: str, service: str = "es") -> None:
def __init__(self, credentials: 'CredentialTypes', region: str, service: str = "es") -> None:
if not credentials:
raise ValueError("Credentials cannot be empty")
self.credentials = credentials
Expand All @@ -28,16 +34,16 @@ def __init__(self, credentials: Any, region: str, service: str = "es") -> None:
raise ValueError("Service name cannot be empty")
self.service = service

def __call__(
async def __call__(
self,
method: str,
url: str,
query_string: Optional[str] = None,
body: Optional[Union[str, bytes]] = None,
) -> Dict[str, str]:
return self._sign_request(method, url, query_string, body)
return await self._sign_request(method, url, query_string, body)

def _sign_request(
async def _sign_request(
self,
method: str,
url: str,
Expand Down Expand Up @@ -67,12 +73,15 @@ def _sign_request(
# correspond to the secret_key used to sign the request. To avoid this,
# get_frozen_credentials() which returns non-refreshing credentials is
# called if it exists.
credentials = (
self.credentials.get_frozen_credentials()
if hasattr(self.credentials, "get_frozen_credentials")
and callable(self.credentials.get_frozen_credentials)
else self.credentials
)
if (
hasattr(self.credentials, "get_frozen_credentials")
and callable(self.credentials.get_frozen_credentials)
):
credentials = self.credentials.get_frozen_credentials()
if inspect.isawaitable(credentials):
credentials = await credentials
else:
credentials = self.credentials

sig_v4_auth = SigV4Auth(credentials, self.service, self.region)
sig_v4_auth.add_auth(aws_request)
Expand Down
Loading