-
Notifications
You must be signed in to change notification settings - Fork 154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add alembic operations for vectorizer #266
base: main
Are you sure you want to change the base?
Changes from 18 commits
a011739
dbd39ad
e2afe66
4637367
a0414df
86a952d
67f5209
11de6d3
b6096e0
9cc46ac
800d160
cff7528
da7b72b
8fe145e
ff28a39
5e76cf9
5b11d7e
0d96b9a
edec725
c619acd
4792820
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,228 @@ | ||
# SQLAlchemy Integration with pgai Vectorizer | ||
|
||
The `embedding_relationship` is a SQLAlchemy helper that integrates pgai's vectorization capabilities directly into your SQLAlchemy models. This allows you to easily query vector embeddings created by pgai using familiar SQLAlchemy patterns. | ||
|
||
## Installation | ||
|
||
To use the SQLAlchemy integration, install pgai with the SQLAlchemy extras: | ||
|
||
```bash | ||
pip install "pgai[sqlalchemy]" | ||
``` | ||
|
||
## Basic Usage | ||
|
||
Here's a basic example of how to use the `embedding_relationship`: | ||
|
||
```python | ||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column | ||
from pgai.sqlalchemy import embedding_relationship, EmbeddingModel | ||
|
||
class Base(DeclarativeBase): | ||
pass | ||
|
||
class BlogPost(Base): | ||
__tablename__ = "blog_posts" | ||
|
||
id: Mapped[int] = mapped_column(primary_key=True) | ||
title: Mapped[str] | ||
content: Mapped[str] | ||
|
||
# Add vector embeddings for the content field | ||
content_embeddings = embedding_relationship( | ||
dimensions=768 | ||
) | ||
``` | ||
Note if you work with alembics autogenerate functionality for migrations, also check [Working with alembic](#working-with-alembic). | ||
|
||
### Semantic Search | ||
|
||
You can then perform semantic similarity search on the field using [pgvector-python's](https://github.com/pgvector/pgvector-python) distance functions: | ||
|
||
```python | ||
from sqlalchemy import func, text | ||
|
||
similar_posts = ( | ||
session.query(BlogPost.content_embeddings) | ||
.order_by( | ||
BlogPost.content_embeddings.embedding.cosine_distance( | ||
func.ai.openai_embed( | ||
"text-embedding-3-small", | ||
"search query", | ||
text("dimensions => 768") | ||
) | ||
) | ||
) | ||
.limit(5) | ||
.all() | ||
) | ||
``` | ||
|
||
Or if you already have the embeddings in your application: | ||
|
||
```python | ||
similar_posts = ( | ||
session.query(BlogPost.content_embeddings) | ||
.order_by( | ||
BlogPost.content_embeddings.embedding.cosine_distance( | ||
[3, 1, 2] | ||
) | ||
) | ||
.limit(5) | ||
.all() | ||
) | ||
``` | ||
|
||
## Configuration | ||
|
||
The `embedding_relationship` accepts the following parameters: | ||
|
||
- `dimensions` (int): The size of the embedding vector (required) | ||
- `target_schema` (str, optional): Override the schema for the embeddings table. If not provided, inherits from the parent model's schema | ||
- `target_table` (str, optional): Override the table name for embeddings. Default is `{table_name}_embedding_store` | ||
|
||
Additional parameters are simply forwarded to the underlying [SQLAlchemy relationship](https://docs.sqlalchemy.org/en/20/orm/relationships.html) so you can configure it as you desire. | ||
|
||
Think of the `embedding_relationship` as a normal SQLAlchemy relationship, but with a preconfigured model instance under the hood. | ||
|
||
|
||
## Setting up the Vectorizer | ||
|
||
After defining your model, you need to create the vectorizer using pgai's SQL functions: | ||
|
||
```sql | ||
SELECT ai.create_vectorizer( | ||
'blog_posts'::regclass, | ||
embedding => ai.embedding_openai('text-embedding-3-small', 768), | ||
chunking => ai.chunking_recursive_character_text_splitter( | ||
'content', | ||
50, -- chunk_size | ||
10 -- chunk_overlap | ||
) | ||
); | ||
``` | ||
|
||
We recommend adding this to a migration script and run it via alembic. | ||
|
||
|
||
## Querying Embeddings | ||
|
||
The `embedding_relationship` provides several ways to work with embeddings: | ||
|
||
### 1. Direct Access to Embeddings | ||
|
||
If you access the class proeprty of your model the `embedding_relationship` provide a SQLAlchemy model that you can query directly: | ||
|
||
```python | ||
# Get all embeddings | ||
embeddings = session.query(BlogPost.content_embeddings).all() | ||
|
||
# Access embedding properties | ||
for embedding in embeddings: | ||
print(embedding.embedding) # The vector embedding | ||
print(embedding.chunk) # The text chunk | ||
``` | ||
The model will have the primary key fields of the parent model as well as the following fields: | ||
- `chunk` (str): The text chunk that was embedded | ||
- `embedding` (Vector): The vector embedding | ||
- `chunk_seq` (int): The sequence number of the chunk | ||
- `embedding_uuid` (str): The UUID of the embedding | ||
- `parent` (ParentModel): The parent model instance | ||
|
||
### 2. Relationship Access | ||
|
||
|
||
```python | ||
blog_post = session.query(BlogPost).first() | ||
for embedding in blog_post.content_embeddings: | ||
print(embedding.chunk) | ||
``` | ||
Access the original posts through the parent relationship | ||
```python | ||
for embedding in similar_posts: | ||
print(embedding.parent.title) | ||
``` | ||
|
||
### 3. Join Queries | ||
|
||
You can combine embedding queries with regular SQL queries using the relationship: | ||
|
||
```python | ||
results = ( | ||
session.query(BlogPost, BlogPost.content_embeddings) | ||
.join(BlogPost.content_embeddings) | ||
.filter(BlogPost.title.ilike("%search term%")) | ||
.all() | ||
) | ||
|
||
for post, embedding in results: | ||
print(f"Title: {post.title}") | ||
print(f"Chunk: {embedding.chunk}") | ||
``` | ||
|
||
## Working with alembic | ||
|
||
### Excluding managed tables | ||
The `embedding_relationship` generates a new SQLAlchemy model, that is available under the attribute that you specify. If you are using alembic's autogenerate functionality to generate migrations, you will need to exclude these models from the autogenerate process. | ||
These are added to a list in your metadata called `pgai_managed_tables` and you can exclude them by adding the following to your `env.py`: | ||
|
||
```python | ||
def include_object(object, name, type_, reflected, compare_to): | ||
if type_ == "table" and name in target_metadata.info.get("pgai_managed_tables", set()): | ||
return False | ||
return True | ||
|
||
context.configure( | ||
connection=connection, | ||
target_metadata=target_metadata, | ||
include_object=include_object | ||
) | ||
``` | ||
|
||
This should now prevent alembic from generating tables for these models when you run `alembic revision --autogenerate`. | ||
|
||
|
||
### Creating vectorizers | ||
pgai provides native Alembic operations for managing vectorizers. For them to work you need to run `register_operations` in your env.py file. Which registers the pgai operations under the global op context: | ||
|
||
```python | ||
from pgai.alembic import register_operations | ||
|
||
register_operations() | ||
``` | ||
|
||
Then you can use the `create_vectorizer` operation to create a vectorizer for your model. As well as the `drop_vectorizer` operation to remove it. | ||
|
||
```python | ||
from alembic import op | ||
from pgai.vectorizer.configuration import ( | ||
OpenAIEmbeddingConfig, | ||
CharacterTextSplitterConfig, | ||
PythonTemplateConfig | ||
) | ||
|
||
|
||
def upgrade() -> None: | ||
op.create_vectorizer( | ||
source_table="blog", | ||
target_table='blog_embeddings', | ||
embedding=OpenAIEmbeddingConfig( | ||
model='text-embedding-3-small', | ||
dimensions=768 | ||
), | ||
chunking=CharacterTextSplitterConfig( | ||
chunk_column='content', | ||
chunk_size=800, | ||
chunk_overlap=400, | ||
separator='.', | ||
is_separator_regex=False | ||
), | ||
formatting=PythonTemplateConfig(template='$title - $chunk') | ||
) | ||
|
||
|
||
def downgrade() -> None: | ||
op.drop_vectorizer(vectorizer_id=1, drop_all=True) | ||
``` | ||
|
||
The `create_vectorizer` operation supports all configuration options available in the [SQL API](vectorizer-api-reference.md). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from pgai.alembic.operations import ( | ||
CreateVectorizerOp, | ||
DropVectorizerOp, | ||
register_operations, | ||
) | ||
|
||
__all__ = ["CreateVectorizerOp", "DropVectorizerOp", "register_operations"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
from typing import Any | ||
|
||
from alembic.operations import MigrateOperation, Operations | ||
from sqlalchemy import text | ||
from typing_extensions import override | ||
|
||
from pgai.vectorizer.configuration import ( | ||
CharacterTextSplitterConfig, | ||
ChunkValueConfig, | ||
CreateVectorizerParams, | ||
DiskANNIndexingConfig, | ||
HNSWIndexingConfig, | ||
NoIndexingConfig, | ||
NoSchedulingConfig, | ||
OllamaEmbeddingConfig, | ||
OpenAIEmbeddingConfig, | ||
ProcessingConfig, | ||
PythonTemplateConfig, | ||
RecursiveCharacterTextSplitterConfig, | ||
TimescaleSchedulingConfig, | ||
VoyageAIEmbeddingConfig, | ||
) | ||
|
||
|
||
class CreateVectorizerOp(MigrateOperation): | ||
"""Create a vectorizer for automatic embedding generation.""" | ||
|
||
def __init__( | ||
self, | ||
source_table: str | None, | ||
embedding: OpenAIEmbeddingConfig | ||
| OllamaEmbeddingConfig | ||
| VoyageAIEmbeddingConfig | ||
| None = None, | ||
chunking: CharacterTextSplitterConfig | ||
| RecursiveCharacterTextSplitterConfig | ||
| None = None, | ||
indexing: DiskANNIndexingConfig | ||
| HNSWIndexingConfig | ||
| NoIndexingConfig | ||
| None = None, | ||
formatting: ChunkValueConfig | PythonTemplateConfig | None = None, | ||
scheduling: TimescaleSchedulingConfig | NoSchedulingConfig | None = None, | ||
processing: ProcessingConfig | None = None, | ||
target_schema: str | None = None, | ||
target_table: str | None = None, | ||
view_schema: str | None = None, | ||
view_name: str | None = None, | ||
queue_schema: str | None = None, | ||
queue_table: str | None = None, | ||
grant_to: list[str] | None = None, | ||
enqueue_existing: bool = True, | ||
): | ||
self.params = CreateVectorizerParams( | ||
source_table=source_table, | ||
embedding=embedding, | ||
chunking=chunking, | ||
indexing=indexing, | ||
formatting=formatting, | ||
scheduling=scheduling, | ||
processing=processing, | ||
target_schema=target_schema, | ||
target_table=target_table, | ||
view_schema=view_schema, | ||
view_name=view_name, | ||
queue_schema=queue_schema, | ||
queue_table=queue_table, | ||
grant_to=grant_to, | ||
enqueue_existing=enqueue_existing, | ||
) | ||
|
||
@classmethod | ||
def create_vectorizer(cls, operations: Operations, source_table: str, **kw: Any): | ||
"""Issue a CREATE VECTORIZER command.""" | ||
op = CreateVectorizerOp(source_table, **kw) | ||
return operations.invoke(op) | ||
|
||
@override | ||
def reverse(self) -> MigrateOperation: | ||
"""Creates the downgrade operation""" | ||
return DropVectorizerOp(None, True) | ||
|
||
|
||
class DropVectorizerOp(MigrateOperation): | ||
"""Drop a vectorizer and its associated objects.""" | ||
|
||
def __init__(self, vectorizer_id: int | None, drop_all: bool): | ||
self.vectorizer_id = vectorizer_id | ||
self.drop_all = drop_all | ||
|
||
@classmethod | ||
def drop_vectorizer( | ||
cls, | ||
operations: Operations, | ||
vectorizer_id: int | None = None, | ||
drop_all: bool = True, | ||
): | ||
"""Issue a DROP VECTORIZER command.""" | ||
op = DropVectorizerOp(vectorizer_id, drop_all) | ||
return operations.invoke(op) | ||
|
||
@override | ||
def reverse(self) -> MigrateOperation: | ||
"""Creates the upgrade operation""" | ||
return CreateVectorizerOp(None) | ||
|
||
|
||
def create_vectorizer(operations: Operations, operation: CreateVectorizerOp): | ||
"""Implement CREATE VECTORIZER.""" | ||
params = operation.params | ||
operations.execute(params.to_sql()) | ||
|
||
|
||
def drop_vectorizer(operations: Operations, operation: DropVectorizerOp): | ||
"""Implement DROP VECTORIZER with cleanup of dependent objects.""" | ||
connection = operations.get_bind() | ||
vectorizer_id = operation.vectorizer_id | ||
|
||
# Finally drop the vectorizer itself | ||
connection.execute( | ||
text("SELECT ai.drop_vectorizer(:id, drop_all=>:drop_all)"), | ||
{"id": vectorizer_id, "drop_all": operation.drop_all}, | ||
) | ||
|
||
|
||
_operations_registered = False | ||
|
||
|
||
def register_operations(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this also only needed for the autogen stuff? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no this is necessary to make the create and drop vectorizer functions available under alembic All the alembic operations are managed this way. You can read up about the register functions here. They are meant to be used as a decorator but then they only work if the module is imported. So I think asking the user to explicitly call |
||
global _operations_registered | ||
|
||
if not _operations_registered: | ||
Operations.register_operation("create_vectorizer")(CreateVectorizerOp) | ||
Operations.register_operation("drop_vectorizer")(DropVectorizerOp) | ||
Operations.implementation_for(CreateVectorizerOp)(create_vectorizer) | ||
Operations.implementation_for(DropVectorizerOp)(drop_vectorizer) | ||
_operations_registered = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the None argument for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reverse function is used to allow reversing migrations (upgrade->downgrade) and it seems to have to create a valid MIgrateOperation Object.
However to reverse a drop operation I'd have to fully load the current config and pass all the parameters back into the create operations (similar to what the autogenerate PR does).
To avoid this I made the CreateVectorizerOps source_table nullable, this doesn't create a valid operation since you can't call
ai.create_vectorizer(NULL)
but at least the alembic interface is happy.