Skip to content

Commit

Permalink
Instrument PyMongo AsyncMongoClient (#1254)
Browse files Browse the repository at this point in the history
* Update pymongo instrumentation and add async client

* Add tests for pymongo async

* Update tests for sync pymongo client

* Parametrize uninstrumented test

* Linting

* Add comment to explain instance info

* Update mongodb in CI

* Update mongodb in CI

* Split mongodb CI into v3 and v8

* Fix pymongo v3 failures

* Fix Python 3.7 issue in mongo tests

* Guard attr access

* Guard instance info

* Disable flaskmaster testing

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
TimPansino and mergify[bot] authored Nov 25, 2024
1 parent 67c62da commit 0867109
Show file tree
Hide file tree
Showing 8 changed files with 755 additions and 236 deletions.
67 changes: 65 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ jobs:
- grpc
- kafka
- memcached
- mongodb
- mongodb3
- mongodb8
- mssql
- mysql
- nginx
Expand Down Expand Up @@ -1004,7 +1005,7 @@ jobs:
path: ./**/.coverage.*
retention-days: 1

mongodb:
mongodb3:
env:
TOTAL_GROUPS: 1

Expand Down Expand Up @@ -1066,6 +1067,68 @@ jobs:
path: ./**/.coverage.*
retention-days: 1

mongodb8:
env:
TOTAL_GROUPS: 1

strategy:
fail-fast: false
matrix:
group-number: [1]

runs-on: ubuntu-latest
container:
image: ghcr.io/newrelic/newrelic-python-agent-ci:latest
options: >-
--add-host=host.docker.internal:host-gateway
timeout-minutes: 30
services:
mongodb:
image: mongo:8.0.3
ports:
- 8080:27017
- 8081:27017
# Set health checks to wait until mongodb has started
options: >-
--health-cmd "echo 'db.runCommand(\"ping\").ok' | mongosh localhost:27017/test --quiet || exit 1"
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # 4.1.1

- name: Fetch git tags
run: |
git config --global --add safe.directory "$GITHUB_WORKSPACE"
git fetch --tags origin
- name: Configure pip cache
run: |
mkdir -p /github/home/.cache/pip
chown -R $(whoami) /github/home/.cache/pip
- name: Get Environments
id: get-envs
run: |
echo "envs=$(tox -l | grep '^${{ github.job }}\-' | ./.github/workflows/get-envs.py)" >> $GITHUB_OUTPUT
env:
GROUP_NUMBER: ${{ matrix.group-number }}

- name: Test
run: |
tox -vv -e ${{ steps.get-envs.outputs.envs }} -p auto
env:
TOX_PARALLEL_NO_SPINNER: 1
PY_COLORS: 0

- name: Upload Coverage Artifacts
uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # 4.3.1
with:
name: coverage-${{ github.job }}-${{ strategy.job-index }}
path: ./**/.coverage.*
retention-days: 1

elasticsearchserver07:
env:
TOTAL_GROUPS: 1
Expand Down
29 changes: 23 additions & 6 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3584,34 +3584,51 @@ def _process_module_builtin_defaults():
_process_module_definition(
"pymongo.synchronous.pool",
"newrelic.hooks.datastore_pymongo",
"instrument_pymongo_pool",
"instrument_pymongo_synchronous_pool",
)
_process_module_definition(
"pymongo.asynchronous.pool",
"newrelic.hooks.datastore_pymongo",
"instrument_pymongo_asynchronous_pool",
)

_process_module_definition(
"pymongo.synchronous.collection",
"newrelic.hooks.datastore_pymongo",
"instrument_pymongo_collection",
"instrument_pymongo_synchronous_collection",
)
_process_module_definition(
"pymongo.asynchronous.collection",
"newrelic.hooks.datastore_pymongo",
"instrument_pymongo_asynchronous_collection",
)

_process_module_definition(
"pymongo.synchronous.mongo_client",
"newrelic.hooks.datastore_pymongo",
"instrument_pymongo_mongo_client",
"instrument_pymongo_synchronous_mongo_client",
)
_process_module_definition(
"pymongo.asynchronous.mongo_client",
"newrelic.hooks.datastore_pymongo",
"instrument_pymongo_asynchronous_mongo_client",
)

# Older pymongo module locations
_process_module_definition(
"pymongo.connection",
"newrelic.hooks.datastore_pymongo",
"instrument_pymongo_pool",
"instrument_pymongo_synchronous_pool",
)
_process_module_definition(
"pymongo.collection",
"newrelic.hooks.datastore_pymongo",
"instrument_pymongo_collection",
"instrument_pymongo_synchronous_collection",
)
_process_module_definition(
"pymongo.mongo_client",
"newrelic.hooks.datastore_pymongo",
"instrument_pymongo_mongo_client",
"instrument_pymongo_synchronous_mongo_client",
)

# Redis v4.2+
Expand Down
191 changes: 148 additions & 43 deletions newrelic/hooks/datastore_pymongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,54 +14,123 @@

import sys

from newrelic.api.datastore_trace import wrap_datastore_trace
from newrelic.api.datastore_trace import DatastoreTrace
from newrelic.api.function_trace import wrap_function_trace
from newrelic.common.object_wrapper import wrap_function_wrapper

_pymongo_client_methods = (
"save",
"insert",
"update",
"drop",
"remove",
"find_one",
"find",
"count",
_pymongo_client_async_methods = (
"aggregate",
"aggregate_raw_batches",
"bulk_write",
"count_documents",
"create_index",
"ensure_index",
"drop_indexes",
"create_indexes",
"create_search_index",
"create_search_indexes",
"delete_many",
"delete_one",
"distinct",
"drop",
"drop_index",
"reindex",
"drop_indexes",
"drop_search_index",
"estimated_document_count",
"find_one",
"find_one_and_delete",
"find_one_and_replace",
"find_one_and_update",
"index_information",
"insert_many",
"insert_one",
"list_indexes",
"list_search_indexes",
"options",
"group",
"rename",
"distinct",
"map_reduce",
"inline_map_reduce",
"find_and_modify",
"initialize_unordered_bulk_op",
"initialize_ordered_bulk_op",
"bulk_write",
"insert_one",
"insert_many",
"replace_one",
"update_one",
"update_many",
"delete_one",
"delete_many",
"update_one",
"update_search_index",
"watch",
)

_pymongo_client_sync_methods = (
"find_raw_batches",
"find",
# Legacy methods from PyMongo 3
"count",
"ensure_index",
"find_and_modify",
"group",
"initialize_ordered_bulk_op",
"initialize_unordered_bulk_op",
"inline_map_reduce",
"insert",
"map_reduce",
"parallel_scan",
"create_indexes",
"list_indexes",
"aggregate",
"aggregate_raw_batches",
"find_one_and_delete",
"find_one_and_replace",
"find_one_and_update",
"reindex",
"remove",
"save",
"update",
)


def instrument_pymongo_pool(module):
def instance_info(collection):
try:
nodes = collection.database.client.nodes
if len(nodes) == 1:
return next(iter(nodes))
except Exception:
pass

# If there are 0 nodes we're not currently connected, return nothing.
# If there are 2+ nodes we're in a load balancing setup.
# Unfortunately we can't rely on a deeper method to determine the actual server we're connected to in all cases.
# We can't report more than 1 server for instance info, so we opt here to ignore reporting the host/port and
# leave it empty to avoid confusing customers by guessing and potentially reporting the wrong server.
return None, None


def wrap_pymongo_method(module, class_name, method_name, is_async=False):
cls = getattr(module, class_name)
if not hasattr(cls, method_name):
return

# Define wrappers as closures to preserve method_name
def _wrap_pymongo_method_sync(wrapped, instance, args, kwargs):
target = getattr(instance, "name", None)
database_name = getattr(getattr(instance, "database", None), "name", None)
with DatastoreTrace(
product="MongoDB", target=target, operation=method_name, database_name=database_name
) as trace:
response = wrapped(*args, **kwargs)

# Gather instance info after response to ensure client is conncected
address = instance_info(instance)
trace.host = address[0]
trace.port_path_or_id = address[1]

return response

async def _wrap_pymongo_method_async(wrapped, instance, args, kwargs):
target = getattr(instance, "name", None)
database_name = getattr(getattr(instance, "database", None), "name", None)
with DatastoreTrace(
product="MongoDB", target=target, operation=method_name, database_name=database_name
) as trace:
response = await wrapped(*args, **kwargs)

# Gather instance info after response to ensure client is conncected
address = instance_info(instance)
trace.host = address[0]
trace.port_path_or_id = address[1]

return response

wrapper = _wrap_pymongo_method_async if is_async else _wrap_pymongo_method_sync
wrap_function_wrapper(module, f"{class_name}.{method_name}", wrapper)


def instrument_pymongo_synchronous_pool(module):
# Exit early if this is a reimport of code from the newer module location
moved_module = "pymongo.synchronous.pool"
if module.__name__ != moved_module and moved_module in sys.modules:
Expand All @@ -77,7 +146,22 @@ def instrument_pymongo_pool(module):
)


def instrument_pymongo_mongo_client(module):
def instrument_pymongo_asynchronous_pool(module):
rollup = ("Datastore/all", "Datastore/MongoDB/all")

# Must name function explicitly as pymongo overrides the
# __getattr__() method in a way that breaks introspection.

wrap_function_trace(
module,
"AsyncConnection.__init__",
name=f"{module.__name__}:AsyncConnection.__init__",
terminal=True,
rollup=rollup,
)


def instrument_pymongo_synchronous_mongo_client(module):
# Exit early if this is a reimport of code from the newer module location
moved_module = "pymongo.synchronous.mongo_client"
if module.__name__ != moved_module and moved_module in sys.modules:
Expand All @@ -93,17 +177,38 @@ def instrument_pymongo_mongo_client(module):
)


def instrument_pymongo_collection(module):
def instrument_pymongo_asynchronous_mongo_client(module):
rollup = ("Datastore/all", "Datastore/MongoDB/all")

# Must name function explicitly as pymongo overrides the
# __getattr__() method in a way that breaks introspection.

wrap_function_trace(
module,
"AsyncMongoClient.__init__",
name=f"{module.__name__}:AsyncMongoClient.__init__",
terminal=True,
rollup=rollup,
)


def instrument_pymongo_synchronous_collection(module):
# Exit early if this is a reimport of code from the newer module location
moved_module = "pymongo.synchronous.collection"
if module.__name__ != moved_module and moved_module in sys.modules:
return

def _collection_name(collection, *args, **kwargs):
return collection.name
if hasattr(module, "Collection"):
for method_name in _pymongo_client_sync_methods:
wrap_pymongo_method(module, "Collection", method_name, is_async=False)
for method_name in _pymongo_client_async_methods:
# Intentionally set is_async=False for sync collection
wrap_pymongo_method(module, "Collection", method_name, is_async=False)


for name in _pymongo_client_methods:
if hasattr(module.Collection, name):
wrap_datastore_trace(
module, f"Collection.{name}", product="MongoDB", target=_collection_name, operation=name
)
def instrument_pymongo_asynchronous_collection(module):
if hasattr(module, "AsyncCollection"):
for method_name in _pymongo_client_sync_methods:
wrap_pymongo_method(module, "AsyncCollection", method_name, is_async=False)
for method_name in _pymongo_client_async_methods:
wrap_pymongo_method(module, "AsyncCollection", method_name, is_async=True)
1 change: 1 addition & 0 deletions tests/datastore_pymongo/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from testing_support.fixture.event_loop import event_loop as loop # noqa
from testing_support.fixtures import ( # noqa: F401; pylint: disable=W0611
collector_agent_registration_fixture,
collector_available_fixture,
Expand Down
Loading

0 comments on commit 0867109

Please sign in to comment.