Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support native asyncpg connection pools #409

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 80 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,42 @@ currently supports the following asyncio database drivers:

[asyncio]: https://docs.python.org/3/library/asyncio.html

#### Asyncpg Connection Pool

```python
import asyncpg
from google.cloud.alloydb.connector import AsyncConnector

async def main():
# initialize Connector object for connections to AlloyDB
connector = AsyncConnector()

# creation function to generate asyncpg connections as the 'connect' arg
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
return await connector.connect(
instance_connection_name,
"asyncpg",
user="my-user",
password="my-password",
db="my-db",
)

# initialize connection pool
pool = await asyncpg.create_pool(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
connect=getconn,
)

# acquire connection and query AlloyDB database
async with pool.acquire() as conn:
res = await conn.fetch("SELECT NOW()")

# close Connector
await connector.close()
```

#### SQLAlchemy Async Engine

```python
import asyncpg

Expand All @@ -260,7 +296,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.alloydb.connector import AsyncConnector

async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
# initialize Connector object for connections to AlloyDB
# creation function to generate asyncpg connections as 'async_creator' arg
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
Expand Down Expand Up @@ -311,6 +347,39 @@ visit the [official documentation][asyncpg-docs].
The `AsyncConnector` also may be used as an async context manager, removing the
need for explicit calls to `connector.close()` to cleanup resources.

#### Asyncpg Connection Pool

```python
import asyncpg
from google.cloud.alloydb.connector import AsyncConnector

async def main():
# initialize AsyncConnector object for connections to AlloyDB
async with AsyncConnector() as connector:

# creation function to generate asyncpg connections as the 'connect' arg
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
return await connector.connect(
instance_connection_name,
"asyncpg",
user="my-user",
password="my-password",
db="my-db",
)

# create connection pool
pool = await asyncpg.create_pool(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
connect=getconn,
)

# acquire connection and query AlloyDB database
async with pool.acquire() as conn:
res = await conn.fetch("SELECT NOW()")
```

#### SQLAlchemy Async Engine

```python
import asyncio
import asyncpg
Expand All @@ -321,17 +390,17 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.alloydb.connector import AsyncConnector

async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
# initialize Connector object for connections to AlloyDB
# creation function to generate asyncpg connections as 'async_creator' arg
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
return conn
conn: asyncpg.Connection = await connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
return conn

# The AlloyDB Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
install_requires=dependencies,
extras_require={
"pg8000": ["pg8000>=1.31.1"],
"asyncpg": ["asyncpg>=0.29.0"],
"asyncpg": ["asyncpg>=0.30.0"],
},
python_requires=">=3.9",
include_package_data=True,
Expand Down
96 changes: 93 additions & 3 deletions tests/system/test_asyncpg_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import os
from typing import Tuple
from typing import Any, Tuple

# [START alloydb_sqlalchemy_connect_async_connector]
import asyncpg
Expand Down Expand Up @@ -88,7 +88,65 @@ async def getconn() -> asyncpg.Connection:
# [END alloydb_sqlalchemy_connect_async_connector]


async def test_connection_with_asyncpg() -> None:
async def create_asyncpg_pool(
instance_connection_name: str,
user: str,
password: str,
db: str,
refresh_strategy: str = "background",
) -> tuple[asyncpg.Pool, AsyncConnector]:
"""Creates a native asyncpg connection pool for an AlloyDB instance and
returns the pool and the connector. Callers are responsible for closing the
pool and the connector.

A sample invocation looks like:

pool, connector = await create_asyncpg_pool(
inst_conn_name,
user,
password,
db,
)
async with pool.acquire() as conn:
hello = await conn.fetch("SELECT 'Hello World!'")
# do something with query result
await connector.close()

Args:
instance_connection_name (str):
The instance connection name specifies the instance relative to the
project and region. For example: "my-project:my-region:my-instance"
user (str):
The database user name, e.g., postgres
password (str):
The database user's password, e.g., secret-password
db (str):
The name of the database, e.g., mydb
refresh_strategy (Optional[str]):
Refresh strategy for the Cloud SQL Connector. Can be one of "lazy"
or "background". For serverless environments use "lazy" to avoid
errors resulting from CPU being throttled.
"""
connector = AsyncConnector(refresh_strategy=refresh_strategy)

async def getconn(
instance_connection_name: str, **kwargs: Any
) -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
instance_connection_name,
"asyncpg",
user=user,
password=password,
db=db,
)
return conn

# create native asyncpg pool (requires asyncpg version >=0.30.0)
pool = await asyncpg.create_pool(instance_connection_name, connect=getconn)
return pool, connector


async def test_sqlalchemy_connection_with_asyncpg() -> None:
"""Basic test to get time from database."""
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
user = os.environ["ALLOYDB_USER"]
Expand All @@ -104,7 +162,7 @@ async def test_connection_with_asyncpg() -> None:
await connector.close()


async def test_lazy_connection_with_asyncpg() -> None:
async def test_lazy_sqlalchemy_connection_with_asyncpg() -> None:
"""Basic test to get time from database."""
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
user = os.environ["ALLOYDB_USER"]
Expand All @@ -120,3 +178,35 @@ async def test_lazy_connection_with_asyncpg() -> None:
assert res[0] == 1

await connector.close()


async def test_connection_with_asyncpg() -> None:
"""Basic test to get time from database."""
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
user = os.environ["ALLOYDB_USER"]
password = os.environ["ALLOYDB_PASS"]
db = os.environ["ALLOYDB_DB"]

pool, connector = await create_asyncpg_pool(inst_uri, user, password, db)

async with pool.acquire() as conn:
res = await conn.fetch("SELECT 1")
assert res[0][0] == 1

await connector.close()


async def test_lazy_connection_with_asyncpg() -> None:
"""Basic test to get time from database."""
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
user = os.environ["ALLOYDB_USER"]
password = os.environ["ALLOYDB_PASS"]
db = os.environ["ALLOYDB_DB"]

pool, connector = await create_asyncpg_pool(inst_uri, user, password, db, "lazy")

async with pool.acquire() as conn:
res = await conn.fetch("SELECT 1")
assert res[0][0] == 1

await connector.close()
Loading