Skip to content

Commit

Permalink
Introduced service time metrics to opensearch-py client
Browse files Browse the repository at this point in the history
Signed-off-by: saimedhi <[email protected]>
  • Loading branch information
saimedhi committed Mar 18, 2024
1 parent 4b69c09 commit 5ea57d3
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added a log collection guide ([#579](https://github.com/opensearch-project/opensearch-py/pull/579))
- Added GHA release ([#614](https://github.com/opensearch-project/opensearch-py/pull/614))
- Incorporated API generation into CI workflow and fixed 'generate' nox session ([#660](https://github.com/opensearch-project/opensearch-py/pull/660))
- Introduced `service time` metrics to opensearch-py client ([#689](https://github.com/opensearch-project/opensearch-py/pull/689))
- Added an automated api update bot for opensearch-py ([#664](https://github.com/opensearch-project/opensearch-py/pull/664))
### Changed
- Updated the `get_policy` API in the index_management plugin to allow the policy_id argument as optional ([#633](https://github.com/opensearch-project/opensearch-py/pull/633))
Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ sphinx_rtd_theme
jinja2
pytz
deepmerge
Events

# No wheels for Python 3.10 yet!
numpy; python_version<"3.10"
Expand Down
6 changes: 5 additions & 1 deletion opensearchpy/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Any, Optional, Type

from opensearchpy.client.utils import _normalize_hosts
from opensearchpy.metrics import Metrics
from opensearchpy.transport import Transport


Expand All @@ -22,6 +23,7 @@ def __init__(
self,
hosts: Optional[str] = None,
transport_class: Type[Transport] = Transport,
metrics: Optional[Metrics] = None,
**kwargs: Any
) -> None:
"""
Expand All @@ -38,4 +40,6 @@ class as kwargs, or a string in the format of ``host[:port]`` which will be
:class:`~opensearchpy.Transport` class and, subsequently, to the
:class:`~opensearchpy.Connection` instances.
"""
self.transport = transport_class(_normalize_hosts(hosts), **kwargs)
self.transport = transport_class(
_normalize_hosts(hosts), metrics=metrics, **kwargs
)
18 changes: 17 additions & 1 deletion opensearchpy/connection/http_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
except ImportError:
REQUESTS_AVAILABLE = False

from opensearchpy.metrics import Metrics

from ..compat import reraise_exceptions, string_types, urlencode
from ..exceptions import (
ConnectionError,
Expand Down Expand Up @@ -86,8 +88,10 @@ def __init__(
http_compress: Any = None,
opaque_id: Any = None,
pool_maxsize: Any = None,
metrics: Optional[Metrics] = None,
**kwargs: Any
) -> None:
self.metrics = metrics
if not REQUESTS_AVAILABLE:
raise ImproperlyConfigured(
"Please install requests to use RequestsHttpConnection."
Expand Down Expand Up @@ -188,7 +192,11 @@ def perform_request( # type: ignore
}
send_kwargs.update(settings)
try:
if self.metrics is not None:
self.metrics.request_start()
response = self.session.send(prepared_request, **send_kwargs)
if self.metrics is not None:
self.metrics.request_end()
duration = time.time() - start
raw_data = response.content.decode("utf-8", "surrogatepass")
except reraise_exceptions:
Expand Down Expand Up @@ -244,7 +252,15 @@ def perform_request( # type: ignore
duration,
)

return response.status_code, response.headers, raw_data
if self.metrics is None:
return response.status_code, response.headers, raw_data
else:
return (
response.status_code,
response.headers,
raw_data,
self.metrics.service_time,
)

@property
def headers(self) -> Any: # type: ignore
Expand Down
19 changes: 17 additions & 2 deletions opensearchpy/connection/http_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from urllib3.exceptions import SSLError as UrllibSSLError
from urllib3.util.retry import Retry

from opensearchpy.metrics import Metrics

from ..compat import reraise_exceptions, urlencode
from ..exceptions import (
ConnectionError,
Expand Down Expand Up @@ -115,8 +117,10 @@ def __init__(
ssl_context: Any = None,
http_compress: Any = None,
opaque_id: Any = None,
metrics: Optional[Metrics] = None,
**kwargs: Any
) -> None:
self.metrics = metrics
# Initialize headers before calling super().__init__().
self.headers = urllib3.make_headers(keep_alive=True)

Expand Down Expand Up @@ -267,10 +271,13 @@ def perform_request(
if self.http_auth is not None:
if isinstance(self.http_auth, Callable): # type: ignore
request_headers.update(self.http_auth(method, full_url, body))

if self.metrics is not None:
self.metrics.request_start()
response = self.pool.urlopen(
method, url, body, retries=Retry(False), headers=request_headers, **kw
)
if self.metrics is not None:
self.metrics.request_end()
duration = time.time() - start
raw_data = response.data.decode("utf-8", "surrogatepass")
except reraise_exceptions:
Expand Down Expand Up @@ -304,7 +311,15 @@ def perform_request(
method, full_url, url, orig_body, response.status, raw_data, duration
)

return response.status, response.headers, raw_data
if self.metrics is None:
return response.status, response.headers, raw_data
else:
return (
response.status,
response.headers,
raw_data,
self.metrics.service_time,
)

def get_response_headers(self, response: Any) -> Any:
return {header.lower(): value for header, value in response.headers.items()}
Expand Down
65 changes: 65 additions & 0 deletions opensearchpy/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

import time
from abc import ABC, abstractmethod

from events import Events


class Metrics(ABC):
@abstractmethod
def request_start(self) -> None:
pass

@abstractmethod
def request_end(self) -> None:
pass

@property
@abstractmethod
def start_time(self) -> float:
pass

@property
@abstractmethod
def service_time(self) -> float:
pass


class MetricsEvents(Metrics):
@property
def start_time(self) -> float:
return self._start_time

@property
def service_time(self) -> float:
return self._service_time

def __init__(self) -> None:
self.events = Events()
self._start_time = 0.0
self._service_time = 0.0

# Subscribe to the request_start and request_end events
self.events.request_start += self._on_request_start
self.events.request_end += self._on_request_end

def request_start(self) -> None:
self.events.request_start()

def _on_request_start(self) -> None:
self._start_time = time.perf_counter()

def request_end(self) -> None:
self.events.request_end()

def _on_request_end(self) -> None:
self._end_time = time.perf_counter()
self._service_time = self._end_time - self._start_time
45 changes: 35 additions & 10 deletions opensearchpy/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from itertools import chain
from typing import Any, Callable, Collection, Dict, List, Mapping, Optional, Type, Union

from opensearchpy.metrics import Metrics

from .connection import Connection, Urllib3HttpConnection
from .connection_pool import ConnectionPool, DummyConnectionPool, EmptyConnectionPool
from .exceptions import (
Expand Down Expand Up @@ -91,6 +93,7 @@ class Transport(object):
last_sniff: float
sniff_timeout: Optional[float]
host_info_callback: Any
metrics: Optional[Metrics]

def __init__(
self,
Expand All @@ -112,6 +115,7 @@ def __init__(
retry_on_status: Collection[int] = (502, 503, 504),
retry_on_timeout: bool = False,
send_get_body_as: str = "GET",
metrics: Optional[Metrics] = None,
**kwargs: Any
) -> None:
"""
Expand Down Expand Up @@ -153,6 +157,7 @@ def __init__(
when creating and instance unless overridden by that connection's
options provided as part of the hosts parameter.
"""
self.metrics = metrics
if connection_class is None:
connection_class = self.DEFAULT_CONNECTION_CLASS

Expand Down Expand Up @@ -242,7 +247,7 @@ def _create_connection(host: Any) -> Any:
kwargs.update(host)
if self.pool_maxsize and isinstance(self.pool_maxsize, int):
kwargs["pool_maxsize"] = self.pool_maxsize
return self.connection_class(**kwargs)
return self.connection_class(metrics=self.metrics, **kwargs)

connections = list(zip(map(_create_connection, hosts), hosts))
if len(connections) == 1:
Expand Down Expand Up @@ -405,15 +410,31 @@ def perform_request(
connection = self.get_connection()

try:
status, headers_response, data = connection.perform_request(
method,
url,
params,
body,
headers=headers,
ignore=ignore,
timeout=timeout,
)
if self.metrics:
(
status,
headers_response,
data,
service_time,
) = connection.perform_request(
method,
url,
params,
body,
headers=headers,
ignore=ignore,
timeout=timeout,
)
else:
status, headers_response, data = connection.perform_request(
method,
url,
params,
body,
headers=headers,
ignore=ignore,
timeout=timeout,
)

# Lowercase all the header names for consistency in accessing them.
headers_response = {
Expand Down Expand Up @@ -457,6 +478,10 @@ def perform_request(
data = self.deserializer.loads(
data, headers_response.get("content-type")
)

if self.metrics:
data["client_metrics"] = {"service_time": service_time}

return data

def close(self) -> Any:
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"six",
"python-dateutil",
"certifi>=2022.12.07",
"Events",
]
tests_require = [
"requests>=2.0.0, <3.0.0",
Expand Down

0 comments on commit 5ea57d3

Please sign in to comment.