Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add alembic operations for vectorizer #266

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ jobs:

- name: Install dependencies
working-directory: ./projects/pgai
run: uv sync
run: uv sync --all-extras

- name: Lint
run: just pgai lint
Expand All @@ -122,6 +122,8 @@ jobs:

- name: Run Tests
run: just pgai test
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}

- name: Build the pgai distributable and check artifacts
run: just pgai build
228 changes: 228 additions & 0 deletions docs/python-integration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
# SQLAlchemy Integration with pgai Vectorizer

The `embedding_relationship` is a SQLAlchemy helper that integrates pgai's vectorization capabilities directly into your SQLAlchemy models. This allows you to easily query vector embeddings created by pgai using familiar SQLAlchemy patterns.

## Installation

To use the SQLAlchemy integration, install pgai with the SQLAlchemy extras:

```bash
pip install "pgai[sqlalchemy]"
```

## Basic Usage

Here's a basic example of how to use the `embedding_relationship`:

```python
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from pgai.sqlalchemy import embedding_relationship, EmbeddingModel

class Base(DeclarativeBase):
pass

class BlogPost(Base):
__tablename__ = "blog_posts"

id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str]
content: Mapped[str]

# Add vector embeddings for the content field
content_embeddings = embedding_relationship(
dimensions=768
)
```
Note if you work with alembics autogenerate functionality for migrations, also check [Working with alembic](#working-with-alembic).

### Semantic Search

You can then perform semantic similarity search on the field using [pgvector-python's](https://github.com/pgvector/pgvector-python) distance functions:

```python
from sqlalchemy import func, text

similar_posts = (
session.query(BlogPost.content_embeddings)
.order_by(
BlogPost.content_embeddings.embedding.cosine_distance(
func.ai.openai_embed(
"text-embedding-3-small",
"search query",
text("dimensions => 768")
)
)
)
.limit(5)
.all()
)
```

Or if you already have the embeddings in your application:

```python
similar_posts = (
session.query(BlogPost.content_embeddings)
.order_by(
BlogPost.content_embeddings.embedding.cosine_distance(
[3, 1, 2]
)
)
.limit(5)
.all()
)
```

## Configuration

The `embedding_relationship` accepts the following parameters:

- `dimensions` (int): The size of the embedding vector (required)
- `target_schema` (str, optional): Override the schema for the embeddings table. If not provided, inherits from the parent model's schema
- `target_table` (str, optional): Override the table name for embeddings. Default is `{table_name}_embedding_store`

Additional parameters are simply forwarded to the underlying [SQLAlchemy relationship](https://docs.sqlalchemy.org/en/20/orm/relationships.html) so you can configure it as you desire.

Think of the `embedding_relationship` as a normal SQLAlchemy relationship, but with a preconfigured model instance under the hood.


## Setting up the Vectorizer

After defining your model, you need to create the vectorizer using pgai's SQL functions:

```sql
SELECT ai.create_vectorizer(
'blog_posts'::regclass,
embedding => ai.embedding_openai('text-embedding-3-small', 768),
chunking => ai.chunking_recursive_character_text_splitter(
'content',
50, -- chunk_size
10 -- chunk_overlap
)
);
```

We recommend adding this to a migration script and run it via alembic.


## Querying Embeddings

The `embedding_relationship` provides several ways to work with embeddings:

### 1. Direct Access to Embeddings

If you access the class proeprty of your model the `embedding_relationship` provide a SQLAlchemy model that you can query directly:

```python
# Get all embeddings
embeddings = session.query(BlogPost.content_embeddings).all()

# Access embedding properties
for embedding in embeddings:
print(embedding.embedding) # The vector embedding
print(embedding.chunk) # The text chunk
```
The model will have the primary key fields of the parent model as well as the following fields:
- `chunk` (str): The text chunk that was embedded
- `embedding` (Vector): The vector embedding
- `chunk_seq` (int): The sequence number of the chunk
- `embedding_uuid` (str): The UUID of the embedding
- `parent` (ParentModel): The parent model instance

### 2. Relationship Access


```python
blog_post = session.query(BlogPost).first()
for embedding in blog_post.content_embeddings:
print(embedding.chunk)
```
Access the original posts through the parent relationship
```python
for embedding in similar_posts:
print(embedding.parent.title)
```

### 3. Join Queries

You can combine embedding queries with regular SQL queries using the relationship:

```python
results = (
session.query(BlogPost, BlogPost.content_embeddings)
.join(BlogPost.content_embeddings)
.filter(BlogPost.title.ilike("%search term%"))
.all()
)

for post, embedding in results:
print(f"Title: {post.title}")
print(f"Chunk: {embedding.chunk}")
```

## Working with alembic

### Excluding managed tables
The `embedding_relationship` generates a new SQLAlchemy model, that is available under the attribute that you specify. If you are using alembic's autogenerate functionality to generate migrations, you will need to exclude these models from the autogenerate process.
These are added to a list in your metadata called `pgai_managed_tables` and you can exclude them by adding the following to your `env.py`:

```python
def include_object(object, name, type_, reflected, compare_to):
if type_ == "table" and name in target_metadata.info.get("pgai_managed_tables", set()):
return False
return True

context.configure(
connection=connection,
target_metadata=target_metadata,
include_object=include_object
)
```

This should now prevent alembic from generating tables for these models when you run `alembic revision --autogenerate`.


### Creating vectorizers
pgai provides native Alembic operations for managing vectorizers. For them to work you need to run `register_operations` in your env.py file. Which registers the pgai operations under the global op context:

```python
from pgai.alembic import register_operations

register_operations()
```

Then you can use the `create_vectorizer` operation to create a vectorizer for your model. As well as the `drop_vectorizer` operation to remove it.

```python
from alembic import op
from pgai.vectorizer.configuration import (
OpenAIEmbeddingConfig,
CharacterTextSplitterConfig,
PythonTemplateConfig
)


def upgrade() -> None:
op.create_vectorizer(
source_table="blog",
target_table='blog_embeddings',
embedding=OpenAIEmbeddingConfig(
model='text-embedding-3-small',
dimensions=768
),
chunking=CharacterTextSplitterConfig(
chunk_column='content',
chunk_size=800,
chunk_overlap=400,
separator='.',
is_separator_regex=False
),
formatting=PythonTemplateConfig(template='$title - $chunk')
)


def downgrade() -> None:
op.drop_vectorizer(vectorizer_id=1, drop_all=True)
```

The `create_vectorizer` operation supports all configuration options available in the [SQL API](vectorizer-api-reference.md).
7 changes: 7 additions & 0 deletions projects/pgai/pgai/alembic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from pgai.alembic.operations import (
CreateVectorizerOp,
DropVectorizerOp,
register_operations,
)

__all__ = ["CreateVectorizerOp", "DropVectorizerOp", "register_operations"]
137 changes: 137 additions & 0 deletions projects/pgai/pgai/alembic/operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from typing import Any

from alembic.operations import MigrateOperation, Operations
from sqlalchemy import text
from typing_extensions import override

from pgai.vectorizer.configuration import (
CharacterTextSplitterConfig,
ChunkValueConfig,
CreateVectorizerParams,
DiskANNIndexingConfig,
HNSWIndexingConfig,
NoIndexingConfig,
NoSchedulingConfig,
OllamaEmbeddingConfig,
OpenAIEmbeddingConfig,
ProcessingConfig,
PythonTemplateConfig,
RecursiveCharacterTextSplitterConfig,
TimescaleSchedulingConfig,
VoyageAIEmbeddingConfig,
)


class CreateVectorizerOp(MigrateOperation):
"""Create a vectorizer for automatic embedding generation."""

def __init__(
self,
source_table: str | None,
embedding: OpenAIEmbeddingConfig
| OllamaEmbeddingConfig
| VoyageAIEmbeddingConfig
| None = None,
chunking: CharacterTextSplitterConfig
| RecursiveCharacterTextSplitterConfig
| None = None,
indexing: DiskANNIndexingConfig
| HNSWIndexingConfig
| NoIndexingConfig
| None = None,
formatting: ChunkValueConfig | PythonTemplateConfig | None = None,
scheduling: TimescaleSchedulingConfig | NoSchedulingConfig | None = None,
processing: ProcessingConfig | None = None,
target_schema: str | None = None,
target_table: str | None = None,
view_schema: str | None = None,
view_name: str | None = None,
queue_schema: str | None = None,
queue_table: str | None = None,
grant_to: list[str] | None = None,
enqueue_existing: bool = True,
):
self.params = CreateVectorizerParams(
source_table=source_table,
embedding=embedding,
chunking=chunking,
indexing=indexing,
formatting=formatting,
scheduling=scheduling,
processing=processing,
target_schema=target_schema,
target_table=target_table,
view_schema=view_schema,
view_name=view_name,
queue_schema=queue_schema,
queue_table=queue_table,
grant_to=grant_to,
enqueue_existing=enqueue_existing,
)

@classmethod
def create_vectorizer(cls, operations: Operations, source_table: str, **kw: Any):
"""Issue a CREATE VECTORIZER command."""
op = CreateVectorizerOp(source_table, **kw)
return operations.invoke(op)

@override
def reverse(self) -> MigrateOperation:
"""Creates the downgrade operation"""
return DropVectorizerOp(None, True)


class DropVectorizerOp(MigrateOperation):
"""Drop a vectorizer and its associated objects."""

def __init__(self, vectorizer_id: int | None, drop_all: bool):
self.vectorizer_id = vectorizer_id
self.drop_all = drop_all

@classmethod
def drop_vectorizer(
cls,
operations: Operations,
vectorizer_id: int | None = None,
drop_all: bool = True,
):
"""Issue a DROP VECTORIZER command."""
op = DropVectorizerOp(vectorizer_id, drop_all)
return operations.invoke(op)

@override
def reverse(self) -> MigrateOperation:
"""Creates the upgrade operation"""
return CreateVectorizerOp(None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the None argument for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reverse function is used to allow reversing migrations (upgrade->downgrade) and it seems to have to create a valid MIgrateOperation Object.
However to reverse a drop operation I'd have to fully load the current config and pass all the parameters back into the create operations (similar to what the autogenerate PR does).

To avoid this I made the CreateVectorizerOps source_table nullable, this doesn't create a valid operation since you can't call ai.create_vectorizer(NULL) but at least the alembic interface is happy.



def create_vectorizer(operations: Operations, operation: CreateVectorizerOp):
"""Implement CREATE VECTORIZER."""
params = operation.params
operations.execute(params.to_sql())


def drop_vectorizer(operations: Operations, operation: DropVectorizerOp):
"""Implement DROP VECTORIZER with cleanup of dependent objects."""
connection = operations.get_bind()
vectorizer_id = operation.vectorizer_id

# Finally drop the vectorizer itself
connection.execute(
text("SELECT ai.drop_vectorizer(:id, drop_all=>:drop_all)"),
{"id": vectorizer_id, "drop_all": operation.drop_all},
)


_operations_registered = False


def register_operations():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this also only needed for the autogen stuff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no this is necessary to make the create and drop vectorizer functions available under alembic op context.
So you can call op.create_vectorizer

All the alembic operations are managed this way. You can read up about the register functions here. They are meant to be used as a decorator but then they only work if the module is imported. So I think asking the user to explicitly call register_operations, is less confusing.

global _operations_registered

if not _operations_registered:
Operations.register_operation("create_vectorizer")(CreateVectorizerOp)
Operations.register_operation("drop_vectorizer")(DropVectorizerOp)
Operations.implementation_for(CreateVectorizerOp)(create_vectorizer)
Operations.implementation_for(DropVectorizerOp)(drop_vectorizer)
_operations_registered = True
Loading
Loading