Skip to content

Commit

Permalink
chore: add system test
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwotherspoon committed Oct 2, 2024
1 parent 65876d8 commit 4d95c50
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 15 deletions.
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ asyncpg==0.29.0
python-tds==1.15.0
aioresponses==0.7.6
pytest-aiohttp==1.0.5
gevent==24.2.1
104 changes: 104 additions & 0 deletions tests/system/test_pg8000_gevent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime
import os
from typing import Tuple

from gevent import monkey

monkey.patch_all()

import pg8000
import sqlalchemy

from google.cloud.sql.connector import Connector


def create_sqlalchemy_engine(
instance_connection_name: str,
user: str,
password: str,
db: str,
refresh_strategy: str = "background",
) -> Tuple[sqlalchemy.engine.Engine, Connector]:
"""Creates a connection pool for a Cloud SQL instance and returns the pool
and the connector. Callers are responsible for closing the pool and the
connector.
A sample invocation looks like:
engine, connector = create_sqlalchemy_engine(
inst_conn_name,
user,
password,
db,
)
with engine.connect() as conn:
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
conn.commit()
curr_time = time[0]
# do something with query result
connector.close()
Args:
instance_connection_name (str):
The instance connection name specifies the instance relative to the
project and region. For example: "my-project:my-region:my-instance"
user (str):
The database user name, e.g., postgres
password (str):
The database user's password, e.g., secret-password
db (str):
The name of the database, e.g., mydb
refresh_strategy (Optional[str]):
Refresh strategy for the Cloud SQL Connector. Can be one of "lazy"
or "background". For serverless environments use "lazy" to avoid
errors resulting from CPU being throttled.
"""
connector = Connector(refresh_strategy=refresh_strategy)

def getconn() -> pg8000.dbapi.Connection:
conn: pg8000.dbapi.Connection = connector.connect(
instance_connection_name,
"pg8000",
user=user,
password=password,
db=db,
ip_type="public", # can also be "private" or "psc"
)
return conn

# create SQLAlchemy connection pool
engine = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
)
return engine, connector


def test_gevent_pg8000_connection() -> None:
"""Basic test to get time from database with gevent."""
inst_conn_name = os.environ["POSTGRES_CONNECTION_NAME"]
user = os.environ["POSTGRES_USER"]
password = os.environ["POSTGRES_PASS"]
db = os.environ["POSTGRES_DB"]

engine, connector = create_sqlalchemy_engine(inst_conn_name, user, password, db)
with engine.connect() as conn:
time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone()
conn.commit()
curr_time = time[0]
assert type(curr_time) is datetime
connector.close()
15 changes: 0 additions & 15 deletions tests/unit/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,6 @@ def test_connect_with_unsupported_driver(fake_credentials: Credentials) -> None:
assert exc_info.value.args[0] == "Driver 'bad_driver' is not supported."


@pytest.mark.asyncio
async def test_connect_ConnectorLoopError(fake_credentials: Credentials) -> None:
"""Test that ConnectorLoopError is thrown when Connector.connect
is called with event loop running in current thread."""
current_loop = asyncio.get_running_loop()
connector = Connector(credentials=fake_credentials, loop=current_loop)
# try to connect using current thread's loop, should raise error
pytest.raises(
ConnectorLoopError,
connector.connect,
"my-project:my-region:my-instance",
"pg8000",
)


def test_Connector_Init(fake_credentials: Credentials) -> None:
"""Test that Connector __init__ sets default properties properly."""
with patch("google.auth.default") as mock_auth:
Expand Down

0 comments on commit 4d95c50

Please sign in to comment.