Skip to content

Commit

Permalink
add e2e tests and enable other data types
Browse files Browse the repository at this point in the history
  • Loading branch information
pankaj-peerdb committed Jan 16, 2024
1 parent 773f61c commit 0845521
Show file tree
Hide file tree
Showing 14 changed files with 678 additions and 38 deletions.
10 changes: 10 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,16 @@ func (h *FlowRequestHandler) CreatePeer(
}
s3Config := s3ConfigObject.S3Config
encodedConfig, encodingErr = proto.Marshal(s3Config)
case protos.DBType_CLICKHOUSE:

chConfigObject, ok := config.(*protos.Peer_ClickhouseConfig)

if !ok {
return wrongConfigResponse, nil
}

chConfig := chConfigObject.ClickhouseConfig
encodedConfig, encodingErr = proto.Marshal(chConfig)
default:
return wrongConfigResponse, nil
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *ClickhouseAvroSyncMethod) getAvroSchema(
dstTableName string,
schema *model.QRecordSchema,
) (*model.QRecordAvroSchemaDefinition, error) {
avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema)
avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, qvalue.QDWHTypeClickhouse)
if err != nil {
return nil, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down
58 changes: 33 additions & 25 deletions flow/connectors/clickhouse/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,40 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

// TODO: remove extra types from here
var clickhouseTypeToQValueKindMap = map[string]qvalue.QValueKind{
"INT": qvalue.QValueKindInt32,
"BIGINT": qvalue.QValueKindInt64,
"FLOAT": qvalue.QValueKindFloat64,
"DOUBLE": qvalue.QValueKindFloat64,
"REAL": qvalue.QValueKindFloat64,
"VARCHAR": qvalue.QValueKindString,
"CHAR": qvalue.QValueKindString,
"TEXT": qvalue.QValueKindString,
"BOOLEAN": qvalue.QValueKindBoolean,
"DATETIME": qvalue.QValueKindTimestamp,
"TIMESTAMP": qvalue.QValueKindTimestamp,
"TIMESTAMP_NTZ": qvalue.QValueKindTimestamp,
"TIMESTAMP_TZ": qvalue.QValueKindTimestampTZ,
"TIME": qvalue.QValueKindTime,
"DATE": qvalue.QValueKindDate,
"BLOB": qvalue.QValueKindBytes,
"BYTEA": qvalue.QValueKindBytes,
"BINARY": qvalue.QValueKindBytes,
"FIXED": qvalue.QValueKindNumeric,
"NUMBER": qvalue.QValueKindNumeric,
"DECIMAL": qvalue.QValueKindNumeric,
"NUMERIC": qvalue.QValueKindNumeric,
"VARIANT": qvalue.QValueKindJSON,
"GEOMETRY": qvalue.QValueKindGeometry,
"GEOGRAPHY": qvalue.QValueKindGeography,
"INT": qvalue.QValueKindInt32,
"Int64": qvalue.QValueKindInt64,
"Int16": qvalue.QValueKindInt16,
"Float64": qvalue.QValueKindFloat64,
"DOUBLE": qvalue.QValueKindFloat64,
"REAL": qvalue.QValueKindFloat64,
"VARCHAR": qvalue.QValueKindString,
"CHAR": qvalue.QValueKindString,
"TEXT": qvalue.QValueKindString,
"String": qvalue.QValueKindString,
"Bool": qvalue.QValueKindBoolean,
"DateTime": qvalue.QValueKindTimestamp,
"TIMESTAMP": qvalue.QValueKindTimestamp,
"DateTime64(6)": qvalue.QValueKindTimestamp,
"TIMESTAMP_NTZ": qvalue.QValueKindTimestamp,
"TIMESTAMP_TZ": qvalue.QValueKindTimestampTZ,
"TIME": qvalue.QValueKindTime,
"DATE": qvalue.QValueKindDate,
"BLOB": qvalue.QValueKindBytes,
"BYTEA": qvalue.QValueKindBytes,
"BINARY": qvalue.QValueKindBytes,
"FIXED": qvalue.QValueKindNumeric,
"NUMBER": qvalue.QValueKindNumeric,
"DECIMAL": qvalue.QValueKindNumeric,
"NUMERIC": qvalue.QValueKindNumeric,
"VARIANT": qvalue.QValueKindJSON,
"GEOMETRY": qvalue.QValueKindGeometry,
"GEOGRAPHY": qvalue.QValueKindGeography,
"Array(String)": qvalue.QValueKindArrayString,
"Array(Int32)": qvalue.QValueKindArrayInt32,
"Array(Int64)": qvalue.QValueKindArrayInt64,
"Array(Float64)": qvalue.QValueKindArrayFloat64,
}

func qValueKindToClickhouseType(colType qvalue.QValueKind) (string, error) {
Expand Down
14 changes: 13 additions & 1 deletion flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
conns3 "github.com/PeerDB-io/peer-flow/connectors/s3"
connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake"
connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver"
connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/jackc/pgx/v5/pgxpool"
Expand Down Expand Up @@ -197,6 +198,8 @@ func GetQRepSyncConnector(ctx context.Context, config *protos.Peer) (QRepSyncCon
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())
case *protos.Peer_S3Config:
return conns3.NewS3Connector(ctx, config.GetS3Config())
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig()

Check failure on line 202 in flow/connectors/core.go

View workflow job for this annotation

GitHub Actions / lint

missing ',' before newline in argument list (typecheck)
default:

Check failure on line 203 in flow/connectors/core.go

View workflow job for this annotation

GitHub Actions / lint

expected operand, found 'default' (typecheck)
return nil, ErrUnsupportedFunctionality

Check failure on line 204 in flow/connectors/core.go

View workflow job for this annotation

GitHub Actions / lint

missing ',' in argument list (typecheck)
}

Check failure on line 205 in flow/connectors/core.go

View workflow job for this annotation

GitHub Actions / lint

expected operand, found '}' (typecheck)
Expand Down Expand Up @@ -242,6 +245,14 @@ func GetConnector(ctx context.Context, peer *protos.Peer) (Connector, error) {
return conns3.NewS3Connector(ctx, s3Config)
// case protos.DBType_EVENTHUB:
// return connsqlserver.NewSQLServerConnector(ctx, config.GetSqlserverConfig())
case protos.DBType_CLICKHOUSE:
fmt.Println(" case matched CLICKHOUSE")
clickhouseConfig := peer.GetClickhouseConfig()
fmt.Println("clickhouseConfig", clickhouseConfig)
if clickhouseConfig == nil {
return nil, fmt.Errorf("missing clickhouse config for %s peer %s", peer.Type.String(), peer.Name)
}
return connclickhouse.NewClickhouseConnector(ctx, clickhouseConfig)
default:
return nil, fmt.Errorf("unsupported peer type %s", peer.Type.String())
}
Expand All @@ -254,7 +265,8 @@ func GetQRepConsolidateConnector(ctx context.Context,
switch inner.(type) {
case *protos.Peer_SnowflakeConfig:
return connsnowflake.NewSnowflakeConnector(ctx, config.GetSnowflakeConfig())

case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickhouseConnector(ctx, config.GetClickhouseConfig())
default:
return nil, ErrUnsupportedFunctionality
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func getAvroSchema(
dstTableName string,
schema *model.QRecordSchema,
) (*model.QRecordAvroSchemaDefinition, error) {
avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema)
avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, qvalue.QDWHTypeS3)
if err != nil {
return nil, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestWriteRecordsToAvroFileHappyPath(t *testing.T) {
// Define sample data
records, schema := generateRecords(t, true, 10, false)

avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema)
avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, qvalue.QDWHTypeSnowflake)
require.NoError(t, err)

t.Logf("[test] avroSchema: %v", avroSchema)
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestWriteRecordsToZstdAvroFileHappyPath(t *testing.T) {
// Define sample data
records, schema := generateRecords(t, true, 10, false)

avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema)
avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, qvalue.QDWHTypeSnowflake)
require.NoError(t, err)

t.Logf("[test] avroSchema: %v", avroSchema)
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestWriteRecordsToDeflateAvroFileHappyPath(t *testing.T) {
// Define sample data
records, schema := generateRecords(t, true, 10, false)

avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema)
avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, qvalue.QDWHTypeSnowflake)
require.NoError(t, err)

t.Logf("[test] avroSchema: %v", avroSchema)
Expand All @@ -225,7 +225,7 @@ func TestWriteRecordsToAvroFileNonNull(t *testing.T) {

records, schema := generateRecords(t, false, 10, false)

avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema)
avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, qvalue.QDWHTypeSnowflake)
require.NoError(t, err)

t.Logf("[test] avroSchema: %v", avroSchema)
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestWriteRecordsToAvroFileAllNulls(t *testing.T) {
// Define sample data
records, schema := generateRecords(t, true, 10, true)

avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema)
avroSchema, err := model.GetAvroSchemaDefinition("not_applicable", schema, qvalue.QDWHTypeSnowflake)
require.NoError(t, err)

t.Logf("[test] avroSchema: %v", avroSchema)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (s *SnowflakeAvroSyncMethod) getAvroSchema(
dstTableName string,
schema *model.QRecordSchema,
) (*model.QRecordAvroSchemaDefinition, error) {
avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema)
avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, qvalue.QDWHTypeSnowflake)
if err != nil {
return nil, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (g *GenericSQLQueryExecutor) CreateTable(schema *model.QRecordSchema, schem

command := fmt.Sprintf("CREATE TABLE %s.%s (%s)", schemaName, tableName, strings.Join(fields, ", "))

//TODO: this looks brittle, think if CreateTable should move to clickhouse specific implementation
if strings.Contains(tableName, "_ch_") {
command += " ENGINE = MergeTree() ORDER BY id"
}
_, err := g.db.ExecContext(g.ctx, command)
if err != nil {
return fmt.Errorf("failed to create table: %w", err)
Expand Down
179 changes: 179 additions & 0 deletions flow/e2e/clickhouse/clickhouse_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package e2e_clickhouse

import (
"context"
"encoding/json"
"fmt"
"math/big"

connclickhouse "github.com/PeerDB-io/peer-flow/connectors/clickhouse"
"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)

type ClickhouseTestHelper struct {
// config is the Clickhouse config.
Config *protos.ClickhouseConfig
// peer struct holder Clickhouse
Peer *protos.Peer
// connection to another database, to manage the test database
adminClient *connclickhouse.ClickhouseClient
// connection to the test database
testClient *connclickhouse.ClickhouseClient
// testSchemaName is the schema to use for testing.
//testSchemaName string
// dbName is the database used for testing.
testDatabaseName string
}

func NewClickhouseTestHelper() (*ClickhouseTestHelper, error) {
fmt.Printf("\n******************** NewClickhouseTestHelper 1")
//jsonPath := os.Getenv("TEST_CF_CREDS")
jsonPath := "./ch.json"
fmt.Printf("\n******************** NewClickhouseTestHelper 2 jsonPath %+v", jsonPath)
if jsonPath == "" {
return nil, fmt.Errorf("TEST_CF_CREDS env var not set")
}

content, err := e2e.ReadFileToBytes(jsonPath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}

var config protos.ClickhouseConfig
err = json.Unmarshal(content, &config)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal json: %w", err)
}

peer := generateCHPeer(&config)
runID, err := shared.RandomUInt64()
if err != nil {
return nil, fmt.Errorf("failed to generate random uint64: %w", err)
}

testDatabaseName := fmt.Sprintf("e2e_test_%d", runID)

adminClient, err := connclickhouse.NewClickhouseClient(context.Background(), &config)
if err != nil {
return nil, fmt.Errorf("failed to create Clickhouse client: %w", err)
}
err = adminClient.ExecuteQuery(fmt.Sprintf("CREATE DATABASE %s", testDatabaseName))
if err != nil {
return nil, fmt.Errorf("failed to create Clickhouse test database: %w", err)
}

config.Database = testDatabaseName
testClient, err := connclickhouse.NewClickhouseClient(context.Background(), &config)
if err != nil {
return nil, fmt.Errorf("failed to create Clickhouse client: %w", err)
}

return &ClickhouseTestHelper{
Config: &config,
Peer: peer,
adminClient: adminClient,
testClient: testClient,
//testSchemaName: "PUBLIC",
testDatabaseName: testDatabaseName,
}, nil
}

func generateCHPeer(clickhouseConfig *protos.ClickhouseConfig) *protos.Peer {
ret := &protos.Peer{}
ret.Name = "test_ch_peer"
ret.Type = protos.DBType_CLICKHOUSE

ret.Config = &protos.Peer_ClickhouseConfig{
ClickhouseConfig: clickhouseConfig,
}

return ret
}

// Cleanup drops the database.
func (s *ClickhouseTestHelper) Cleanup() error {
err := s.testClient.Close()
if err != nil {
return err
}
err = s.adminClient.ExecuteQuery(fmt.Sprintf("DROP DATABASE %s", s.testDatabaseName))
if err != nil {
return err
}
return s.adminClient.Close()
}

// RunCommand runs the given command.
func (s *ClickhouseTestHelper) RunCommand(command string) error {
return s.testClient.ExecuteQuery(command)
}

// CountRows(tableName) returns the number of rows in the given table.
func (s *ClickhouseTestHelper) CountRows(tableName string) (int, error) {
res, err := s.testClient.CountRows(s.testDatabaseName, tableName)
if err != nil {
return 0, err
}

return int(res), nil
}

// CountRows(tableName) returns the non-null number of rows in the given table.
func (s *ClickhouseTestHelper) CountNonNullRows(tableName string, columnName string) (int, error) {
res, err := s.testClient.CountNonNullRows(s.testDatabaseName, tableName, columnName)
if err != nil {
return 0, err
}

return int(res), nil
}

func (s *ClickhouseTestHelper) CheckNull(tableName string, colNames []string) (bool, error) {
return s.testClient.CheckNull(s.testDatabaseName, tableName, colNames)
}

func (s *ClickhouseTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecordBatch, error) {
return s.testClient.ExecuteAndProcessQuery(query)
}

func (s *ClickhouseTestHelper) CreateTable(tableName string, schema *model.QRecordSchema) error {
return s.testClient.CreateTable(schema, s.testDatabaseName, tableName)
}

// runs a query that returns an int result
func (s *ClickhouseTestHelper) RunIntQuery(query string) (int, error) {
rows, err := s.testClient.ExecuteAndProcessQuery(query)
if err != nil {
return 0, err
}

numRecords := 0
if rows == nil || len(rows.Records) != 1 {
if rows != nil {
numRecords = len(rows.Records)
}
return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 rows", query, numRecords)
}

rec := rows.Records[0]
if rec.NumEntries != 1 {
return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 columns", query, rec.NumEntries)
}

switch rec.Entries[0].Kind {
case qvalue.QValueKindInt32:
return int(rec.Entries[0].Value.(int32)), nil
case qvalue.QValueKindInt64:
return int(rec.Entries[0].Value.(int64)), nil
case qvalue.QValueKindNumeric:
// get big.Rat and convert to int
rat := rec.Entries[0].Value.(*big.Rat)
return int(rat.Num().Int64() / rat.Denom().Int64()), nil
default:
return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec.Entries[0].Kind)
}
}
Loading

0 comments on commit 0845521

Please sign in to comment.