-
Notifications
You must be signed in to change notification settings - Fork 69
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test: update tests to be more scalable (#1156)
- Loading branch information
1 parent
bc13665
commit 610209d
Showing
7 changed files
with
556 additions
and
360 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,77 +13,103 @@ | |
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
""" | ||
|
||
import asyncio | ||
import os | ||
from typing import AsyncGenerator | ||
import uuid | ||
from typing import Tuple | ||
|
||
import asyncpg | ||
import pytest | ||
import sqlalchemy | ||
from sqlalchemy.ext.asyncio import AsyncEngine | ||
from sqlalchemy.ext.asyncio import create_async_engine | ||
import sqlalchemy.ext.asyncio | ||
|
||
from google.cloud.sql.connector import Connector | ||
|
||
table_name = f"books_{uuid.uuid4().hex}" | ||
|
||
async def create_sqlalchemy_engine( | ||
instance_connection_name: str, | ||
user: str, | ||
db: str, | ||
refresh_strategy: str = "background", | ||
) -> Tuple[sqlalchemy.ext.asyncio.engine.AsyncEngine, Connector]: | ||
"""Creates a connection pool for a Cloud SQL instance and returns the pool | ||
and the connector. Callers are responsible for closing the pool and the | ||
connector. | ||
A sample invocation looks like: | ||
engine, connector = await create_sqlalchemy_engine( | ||
inst_conn_name, | ||
user, | ||
db, | ||
) | ||
async with engine.connect() as conn: | ||
time = (await conn.execute(sqlalchemy.text("SELECT NOW()"))).fetchone() | ||
curr_time = time[0] | ||
# do something with query result | ||
await connector.close_async() | ||
Args: | ||
instance_connection_name (str): | ||
The instance connection name specifies the instance relative to the | ||
project and region. For example: "my-project:my-region:my-instance" | ||
user (str): | ||
The formatted IAM database username. | ||
e.g., [email protected], [email protected] | ||
db (str): | ||
The name of the database, e.g., mydb | ||
refresh_strategy (Optional[str]): | ||
Refresh strategy for the Cloud SQL Connector. Can be one of "lazy" | ||
or "background". For serverless environments use "lazy" to avoid | ||
errors resulting from CPU being throttled. | ||
""" | ||
loop = asyncio.get_running_loop() | ||
connector = Connector(loop=loop, refresh_strategy=refresh_strategy) | ||
|
||
# The Cloud SQL Python Connector can be used along with SQLAlchemy using the | ||
# 'async_creator' argument to 'create_async_engine' | ||
async def init_connection_pool() -> AsyncEngine: | ||
async def getconn() -> asyncpg.Connection: | ||
loop = asyncio.get_running_loop() | ||
# initialize Connector object for connections to Cloud SQL | ||
async with Connector(loop=loop) as connector: | ||
conn: asyncpg.Connection = await connector.connect_async( | ||
os.environ["POSTGRES_IAM_CONNECTION_NAME"], | ||
"asyncpg", | ||
user=os.environ["POSTGRES_IAM_USER"], | ||
db=os.environ["POSTGRES_DB"], | ||
enable_iam_auth=True, | ||
) | ||
return conn | ||
conn: asyncpg.Connection = await connector.connect_async( | ||
instance_connection_name, | ||
"asyncpg", | ||
user=user, | ||
db=db, | ||
ip_type="public", # can also be "private" or "psc" | ||
enable_iam_auth=True, | ||
) | ||
return conn | ||
|
||
# create SQLAlchemy connection pool | ||
pool = create_async_engine( | ||
engine = sqlalchemy.ext.asyncio.create_async_engine( | ||
"postgresql+asyncpg://", | ||
async_creator=getconn, | ||
execution_options={"isolation_level": "AUTOCOMMIT"}, | ||
) | ||
return pool | ||
return engine, connector | ||
|
||
|
||
@pytest.fixture(name="pool") | ||
async def setup() -> AsyncGenerator: | ||
pool = await init_connection_pool() | ||
async with pool.connect() as conn: | ||
await conn.execute( | ||
sqlalchemy.text( | ||
f"CREATE TABLE IF NOT EXISTS {table_name}" | ||
" ( id CHAR(20) NOT NULL, title TEXT NOT NULL );" | ||
) | ||
) | ||
async def test_iam_authn_connection_with_asyncpg() -> None: | ||
"""Basic test to get time from database.""" | ||
inst_conn_name = os.environ["POSTGRES_IAM_CONNECTION_NAME"] | ||
user = os.environ["POSTGRES_IAM_USER"] | ||
db = os.environ["POSTGRES_DB"] | ||
|
||
yield pool | ||
pool, connector = await create_sqlalchemy_engine(inst_conn_name, user, db) | ||
|
||
async with pool.connect() as conn: | ||
await conn.execute(sqlalchemy.text(f"DROP TABLE IF EXISTS {table_name}")) | ||
# dispose of asyncpg connection pool | ||
await pool.dispose() | ||
res = (await conn.execute(sqlalchemy.text("SELECT 1"))).fetchone() | ||
assert res[0] == 1 | ||
|
||
await connector.close_async() | ||
|
||
@pytest.mark.asyncio | ||
async def test_connection_with_asyncpg_iam_auth(pool: AsyncEngine) -> None: | ||
insert_stmt = sqlalchemy.text( | ||
f"INSERT INTO {table_name} (id, title) VALUES (:id, :title)", | ||
) | ||
async with pool.connect() as conn: | ||
await conn.execute(insert_stmt, parameters={"id": "book1", "title": "Book One"}) | ||
await conn.execute(insert_stmt, parameters={"id": "book2", "title": "Book Two"}) | ||
|
||
select_stmt = sqlalchemy.text(f"SELECT title FROM {table_name} ORDER BY ID;") | ||
rows = (await conn.execute(select_stmt)).fetchall() | ||
titles = [row[0] for row in rows] | ||
async def test_lazy_iam_authn_connection_with_asyncpg() -> None: | ||
"""Basic test to get time from database.""" | ||
inst_conn_name = os.environ["POSTGRES_IAM_CONNECTION_NAME"] | ||
user = os.environ["POSTGRES_IAM_USER"] | ||
db = os.environ["POSTGRES_DB"] | ||
|
||
pool, connector = await create_sqlalchemy_engine(inst_conn_name, user, db, "lazy") | ||
|
||
async with pool.connect() as conn: | ||
res = (await conn.execute(sqlalchemy.text("SELECT 1"))).fetchone() | ||
assert res[0] == 1 | ||
|
||
assert titles == ["Book One", "Book Two"] | ||
await connector.close_async() |
Oops, something went wrong.