Skip to content

Commit

Permalink
feat: support native asyncpg connection pools
Browse files Browse the repository at this point in the history
This changed is analogous to CloudSQL's change:
GoogleCloudPlatform/cloud-sql-python-connector#1182
  • Loading branch information
rhatgadkar-goog committed Jan 9, 2025
1 parent 40bada7 commit d81da4f
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 15 deletions.
91 changes: 80 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,42 @@ currently supports the following asyncio database drivers:

[asyncio]: https://docs.python.org/3/library/asyncio.html

#### Asyncpg Connection Pool

```python
import asyncpg
from google.cloud.alloydb.connector import AsyncConnector

async def main():
# initialize Connector object for connections to AlloyDB
connector = AsyncConnector()

# creation function to generate asyncpg connections as the 'connect' arg
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
return await connector.connect(
instance_connection_name,
"asyncpg",
user="my-user",
password="my-password",
db="my-db",
)

# initialize connection pool
pool = await asyncpg.create_pool(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
connect=getconn,
)

# acquire connection and query AlloyDB database
async with pool.acquire() as conn:
res = await conn.fetch("SELECT NOW()")

# close Connector
await connector.close()
```

#### SQLAlchemy Async Engine

```python
import asyncpg

Expand All @@ -260,7 +296,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.alloydb.connector import AsyncConnector

async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
# initialize Connector object for connections to AlloyDB
# creation function to generate asyncpg connections as 'async_creator' arg
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
Expand Down Expand Up @@ -311,6 +347,39 @@ visit the [official documentation][asyncpg-docs].
The `AsyncConnector` also may be used as an async context manager, removing the
need for explicit calls to `connector.close()` to cleanup resources.

#### Asyncpg Connection Pool

```python
import asyncpg
from google.cloud.alloydb.connector import AsyncConnector

async def main():
# initialize AsyncConnector object for connections to AlloyDB
async with AsyncConnector() as connector:

# creation function to generate asyncpg connections as the 'connect' arg
async def getconn(instance_connection_name, **kwargs) -> asyncpg.Connection:
return await connector.connect(
instance_connection_name,
"asyncpg",
user="my-user",
password="my-password",
db="my-db",
)

# create connection pool
pool = await asyncpg.create_pool(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
connect=getconn,
)

# acquire connection and query AlloyDB database
async with pool.acquire() as conn:
res = await conn.fetch("SELECT NOW()")
```

#### SQLAlchemy Async Engine

```python
import asyncio
import asyncpg
Expand All @@ -321,17 +390,17 @@ from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from google.cloud.alloydb.connector import AsyncConnector

async def init_connection_pool(connector: AsyncConnector) -> AsyncEngine:
# initialize Connector object for connections to AlloyDB
# creation function to generate asyncpg connections as 'async_creator' arg
async def getconn() -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
return conn
conn: asyncpg.Connection = await connector.connect(
"projects/<YOUR_PROJECT>/locations/<YOUR_REGION>/clusters/<YOUR_CLUSTER>/instances/<YOUR_INSTANCE>",
"asyncpg",
user="my-user",
password="my-password",
db="my-db-name"
# ... additional database driver args
)
return conn

# The AlloyDB Python Connector can be used along with SQLAlchemy using the
# 'async_creator' argument to 'create_async_engine'
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
install_requires=dependencies,
extras_require={
"pg8000": ["pg8000>=1.31.1"],
"asyncpg": ["asyncpg>=0.29.0"],
"asyncpg": ["asyncpg>=0.30.0"],
},
python_requires=">=3.9",
include_package_data=True,
Expand Down
98 changes: 95 additions & 3 deletions tests/system/test_asyncpg_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import os
from typing import Tuple
from typing import Any, Tuple

# [START alloydb_sqlalchemy_connect_async_connector]
import asyncpg
Expand Down Expand Up @@ -88,7 +88,65 @@ async def getconn() -> asyncpg.Connection:
# [END alloydb_sqlalchemy_connect_async_connector]


async def test_connection_with_asyncpg() -> None:
async def create_asyncpg_pool(
instance_connection_name: str,
user: str,
password: str,
db: str,
refresh_strategy: str = "background",
) -> tuple[asyncpg.Pool, AsyncConnector]:
"""Creates a native asyncpg connection pool for an AlloyDB instance and
returns the pool and the connector. Callers are responsible for closing the
pool and the connector.
A sample invocation looks like:
pool, connector = await create_asyncpg_pool(
inst_conn_name,
user,
password,
db,
)
async with pool.acquire() as conn:
hello = await conn.fetch("SELECT 'Hello World!'")
# do something with query result
await connector.close()
Args:
instance_connection_name (str):
The instance connection name specifies the instance relative to the
project and region. For example: "my-project:my-region:my-instance"
user (str):
The database user name, e.g., postgres
password (str):
The database user's password, e.g., secret-password
db (str):
The name of the database, e.g., mydb
refresh_strategy (Optional[str]):
Refresh strategy for the Cloud SQL Connector. Can be one of "lazy"
or "background". For serverless environments use "lazy" to avoid
errors resulting from CPU being throttled.
"""
connector = AsyncConnector(refresh_strategy=refresh_strategy)

async def getconn(
instance_connection_name: str, **kwargs: Any
) -> asyncpg.Connection:
conn: asyncpg.Connection = await connector.connect(
instance_connection_name,
"asyncpg",
user=user,
password=password,
db=db,
)
return conn

# create native asyncpg pool (requires asyncpg version >=0.30.0)
pool = await asyncpg.create_pool(instance_connection_name, connect=getconn)
return pool, connector


async def test_sqlalchemy_connection_with_asyncpg() -> None:
"""Basic test to get time from database."""
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
user = os.environ["ALLOYDB_USER"]
Expand All @@ -104,7 +162,7 @@ async def test_connection_with_asyncpg() -> None:
await connector.close()


async def test_lazy_connection_with_asyncpg() -> None:
async def test_lazy_sqlalchemy_connection_with_asyncpg() -> None:
"""Basic test to get time from database."""
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
user = os.environ["ALLOYDB_USER"]
Expand All @@ -120,3 +178,37 @@ async def test_lazy_connection_with_asyncpg() -> None:
assert res[0] == 1

await connector.close()


async def test_connection_with_asyncpg() -> None:
"""Basic test to get time from database."""
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
user = os.environ["ALLOYDB_USER"]
password = os.environ["ALLOYDB_PASS"]
db = os.environ["ALLOYDB_DB"]

pool, connector = await create_asyncpg_pool(inst_uri, user, password, db)

async with pool.acquire() as conn:
res = await conn.fetch("SELECT 1")
assert res[0][0] == 1

await connector.close()


async def test_lazy_connection_with_asyncpg() -> None:
"""Basic test to get time from database."""
inst_uri = os.environ["ALLOYDB_INSTANCE_URI"]
user = os.environ["ALLOYDB_USER"]
password = os.environ["ALLOYDB_PASS"]
db = os.environ["ALLOYDB_DB"]

pool, connector = await create_asyncpg_pool(
inst_uri, user, password, db, "lazy"
)

async with pool.acquire() as conn:
res = await conn.fetch("SELECT 1")
assert res[0][0] == 1

await connector.close()

0 comments on commit d81da4f

Please sign in to comment.