Skip to content

Commit

Permalink
[ENH]: Add property test log service (#1969)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - Add property testing for log service
  • Loading branch information
nicolasgere authored Apr 4, 2024
1 parent 0f8b5b6 commit 74cc70c
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 9 deletions.
1 change: 1 addition & 0 deletions .github/workflows/chroma-coordinator-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v3
- uses: ariga/setup-atlas@v0
- name: Build and test
run: cd go && make test
env:
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

**/__pycache__

go/bin/
go/**/testdata/
go/coordinator/bin/

Expand Down
5 changes: 5 additions & 0 deletions go/bin/migrate_up_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
atlas schema apply \
-u "$1" \
--to file://database/log/schema/ \
--dev-url "docker://postgres/15/dev" \
--auto-approve
10 changes: 8 additions & 2 deletions go/database/log/db/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/database/log/queries/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ FOR UPDATE;
INSERT INTO record_log (collection_id, "offset", record) values($1, $2, $3);

-- name: GetRecordsForCollection :many
SELECT * FROM record_log r WHERE r.collection_id = $1 AND r.offset > $2 ORDER BY r.offset DESC limit $3 ;
SELECT * FROM record_log r WHERE r.collection_id = $1 AND r.offset >= $2 and r.timestamp <= $4 ORDER BY r.offset ASC limit $3 ;

-- name: GetAllCollectionsToCompact :many
with summary as (
Expand Down
3 changes: 2 additions & 1 deletion go/pkg/log/repository/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string,
return
}

func (r *LogRepository) PullRecords(ctx context.Context, collectionId string, offset int64, batchSize int) (records []log.RecordLog, err error) {
func (r *LogRepository) PullRecords(ctx context.Context, collectionId string, offset int64, batchSize int, timestamp int) (records []log.RecordLog, err error) {
records, err = r.queries.GetRecordsForCollection(ctx, log.GetRecordsForCollectionParams{
CollectionID: collectionId,
Offset: offset,
Limit: int32(batchSize),
Timestamp: int32(timestamp),
})
return
}
Expand Down
158 changes: 158 additions & 0 deletions go/pkg/log/server/property_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package server

import (
"context"
"github.com/chroma-core/chroma/go/pkg/log/repository"
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
"github.com/chroma-core/chroma/go/pkg/proto/logservicepb"
"github.com/chroma-core/chroma/go/pkg/types"
libs2 "github.com/chroma-core/chroma/go/shared/libs"
"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"pgregory.net/rapid"
"testing"
"time"
)

type ModelState struct {
CollectionEnumerationOffset map[types.UniqueID]int64
CollectionData map[types.UniqueID][]*coordinatorpb.OperationRecord
CollectionCompactionOffset map[types.UniqueID]int64
}

type LogServerTestSuite struct {
suite.Suite
logServer logservicepb.LogServiceServer
model ModelState
t *testing.T
}

func (suite *LogServerTestSuite) SetupSuite() {
ctx := context.Background()
connectionString, err := libs2.StartPgContainer(ctx)
assert.NoError(suite.t, err, "Failed to start pg container")
var conn *pgx.Conn
conn, err = libs2.NewPgConnection(ctx, connectionString)
assert.NoError(suite.t, err, "Failed to create new pg connection")
err = libs2.RunMigration(ctx, connectionString)
assert.NoError(suite.t, err, "Failed to run migration")
lr := repository.NewLogRepository(conn)
suite.logServer = NewLogServer(lr)
suite.model = ModelState{
CollectionData: map[types.UniqueID][]*coordinatorpb.OperationRecord{},
CollectionCompactionOffset: map[types.UniqueID]int64{},
}
}

func (suite *LogServerTestSuite) TestRecordLogDb_PushLogs() {
ctx := context.Background()
// Generate collection ids
collections := make([]types.UniqueID, 10)
for i := 0; i < len(collections); i++ {
collections[i] = types.NewUniqueID()
}

collectionGen := rapid.Custom(func(t *rapid.T) types.UniqueID {
return collections[rapid.IntRange(0, len(collections)-1).Draw(t, "collection_id")]
})
recordGen := rapid.SliceOf(rapid.Custom(func(t *rapid.T) *coordinatorpb.OperationRecord {
data := rapid.SliceOf(rapid.Byte()).Draw(t, "record_data")
id := rapid.String().Draw(t, "record_id")
return &coordinatorpb.OperationRecord{
Id: id,
Vector: &coordinatorpb.Vector{
Vector: data,
},
}
}))
rapid.Check(suite.t, func(t *rapid.T) {
t.Repeat(map[string]func(*rapid.T){
"pushLogs": func(t *rapid.T) {
c := collectionGen.Draw(t, "collection")
records := recordGen.Draw(t, "record")
r, err := suite.logServer.PushLogs(ctx, &logservicepb.PushLogsRequest{
CollectionId: c.String(),
Records: records,
})
if err != nil {
t.Fatal(err)
}
if int32(len(records)) != r.RecordCount {
t.Fatal("record count mismatch", len(records), r.RecordCount)
}
suite.model.CollectionData[c] = append(suite.model.CollectionData[c], records...)
},
"getAllCollectionsToCompact": func(t *rapid.T) {
result, err := suite.logServer.GetAllCollectionInfoToCompact(ctx, &logservicepb.GetAllCollectionInfoToCompactRequest{})
assert.NoError(suite.t, err)
for _, collection := range result.AllCollectionInfo {
id, err := types.Parse(collection.CollectionId)
if err != nil {
t.Fatal(err)
}
compactionOffset := rapid.Int64Range(suite.model.CollectionCompactionOffset[id], int64(len(suite.model.CollectionData))).Draw(t, "new_position")
_, err = suite.logServer.UpdateCollectionLogOffset(ctx, &logservicepb.UpdateCollectionLogOffsetRequest{
CollectionId: id.String(),
LogOffset: compactionOffset,
})
if err != nil {
t.Fatal(err)
}
suite.model.CollectionCompactionOffset[id] = compactionOffset
}
},
"pullLogs": func(t *rapid.T) {
c := collectionGen.Draw(t, "collection")
startOffset := rapid.Int64Range(suite.model.CollectionCompactionOffset[c], int64(len(suite.model.CollectionData))).Draw(t, "start_offset")
// If start offset is 0, we need to set it to 1 as the offset is 1 based
if startOffset == 0 {
startOffset = 1
}
batchSize := rapid.Int32Range(1, 20).Draw(t, "batch_size")
response, err := suite.logServer.PullLogs(ctx, &logservicepb.PullLogsRequest{
CollectionId: c.String(),
StartFromOffset: startOffset,
BatchSize: batchSize,
EndTimestamp: time.Now().Unix(),
})
if err != nil {
t.Fatal(err)
}
// Verify that record returned is matching the expected record
for _, record := range response.Records {
expectedRecord := suite.model.CollectionData[c][record.LogOffset-1]
if string(expectedRecord.Vector.Vector) != string(record.Record.Vector.Vector) {
t.Fatalf("expect record vector %s, got %s", string(expectedRecord.Vector.Vector), string(record.Record.Vector.Vector))
}
if expectedRecord.Id != record.Record.Id {
t.Fatalf("expect record id %s, got %s", expectedRecord.Id, record.Record.Id)
}
}

// Verify that the first and last record offset is correct
if len(response.Records) > 0 {
lastRecord := response.Records[len(response.Records)-1]
firstRecord := response.Records[0]
//
expectedLastOffset := startOffset + int64(batchSize) - 1
if expectedLastOffset > int64(len(suite.model.CollectionData[c])) {
expectedLastOffset = int64(len(suite.model.CollectionData[c]))
}
if lastRecord.LogOffset != expectedLastOffset {
t.Fatalf("expect last record %d, got %d", lastRecord.LogOffset, expectedLastOffset)
}
if firstRecord.LogOffset != startOffset {
t.Fatalf("expect first record %d, got %d", startOffset, firstRecord.LogOffset)
}
}
},
})
})
}

func TestLogServerTestSuite(t *testing.T) {
testSuite := new(LogServerTestSuite)
testSuite.t = t
suite.Run(t, testSuite)
}
2 changes: 1 addition & 1 deletion go/pkg/log/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *logServer) PullLogs(ctx context.Context, req *logservicepb.PullLogsRequ
if err != nil {
return
}
records, err := s.lr.PullRecords(ctx, collectionID.String(), req.StartFromOffset, int(req.BatchSize))
records, err := s.lr.PullRecords(ctx, collectionID.String(), req.StartFromOffset, int(req.BatchSize), int(req.EndTimestamp))
if err != nil {
return
}
Expand Down
5 changes: 2 additions & 3 deletions go/shared/libs/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ func StartPgContainer(ctx context.Context) (connectionString string, err error)

func RunMigration(ctx context.Context, connectionString string) (err error) {
cmd := exec.Command("/bin/sh", "bin/migrate_up_test.sh", connectionString)

_, dir, _, _ := runtime.Caller(0)

cmd.Dir = path.Join(dir, "../../../")
byte, err := cmd.Output()
var byte []byte
byte, err = cmd.CombinedOutput()
fmt.Println(string(byte))
return
}

0 comments on commit 74cc70c

Please sign in to comment.