Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClickHouse: Put numeric-string mapping behind feature flag #2305

Merged
merged 16 commits into from
Dec 2, 2024
Merged
3 changes: 2 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
defer connectors.CloseConnector(ctx, dstConn)

if err := dstConn.ReplayTableSchemaDeltas(ctx, flowName, recordBatchSync.SchemaDeltas); err != nil {
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, recordBatchSync.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

Expand Down Expand Up @@ -440,6 +440,7 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
})

errGroup.Go(func() error {
var err error
rowsSynced, err = syncRecords(dstConn, errCtx, config, partition, outstream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (c *BigQueryConnector) waitForTableReady(ctx context.Context, datasetTable
// This could involve adding or dropping multiple columns.
func (c *BigQueryConnector) ReplayTableSchemaDeltas(
ctx context.Context,
env map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *BigQueryConnector) SyncQRepRecords(
partition.PartitionId, destTable))

avroSync := NewQRepAvroSyncMethod(c, config.StagingPath, config.FlowJobName)
return avroSync.SyncQRepRecords(ctx, config.FlowJobName, destTable, partition,
return avroSync.SyncQRepRecords(ctx, config.Env, config.FlowJobName, destTable, partition,
tblMetadata, stream, config.SyncedAtColName, config.SoftDeleteColName)
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(
}
}

err = c.ReplayTableSchemaDeltas(ctx, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta})
err = c.ReplayTableSchemaDeltas(ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta})
if err != nil {
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
}
Expand Down
12 changes: 7 additions & 5 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
}

stagingTable := fmt.Sprintf("%s_%s_staging", rawTableName, strconv.FormatInt(syncBatchID, 10))
numRecords, err := s.writeToStage(ctx, strconv.FormatInt(syncBatchID, 10), rawTableName, avroSchema,
numRecords, err := s.writeToStage(ctx, req.Env, strconv.FormatInt(syncBatchID, 10), rawTableName, avroSchema,
&datasetTable{
project: s.connector.projectID,
dataset: s.connector.datasetID,
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
slog.String(string(shared.FlowNameKey), req.FlowJobName),
slog.String("dstTableName", rawTableName))

err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
err = s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}
Expand Down Expand Up @@ -139,6 +139,7 @@ func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softD

func (s *QRepAvroSyncMethod) SyncQRepRecords(
ctx context.Context,
env map[string]string,
flowJobName string,
dstTableName string,
partition *protos.QRepPartition,
Expand Down Expand Up @@ -167,7 +168,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
table: fmt.Sprintf("%s_%s_staging", dstDatasetTable.table,
strings.ReplaceAll(partition.PartitionId, "-", "_")),
}
numRecords, err := s.writeToStage(ctx, partition.PartitionId, flowJobName, avroSchema,
numRecords, err := s.writeToStage(ctx, env, partition.PartitionId, flowJobName, avroSchema,
stagingDatasetTable, stream, flowJobName)
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %w", err)
Expand Down Expand Up @@ -389,6 +390,7 @@ func GetAvroField(bqField *bigquery.FieldSchema) (AvroField, error) {

func (s *QRepAvroSyncMethod) writeToStage(
ctx context.Context,
env map[string]string,
syncID string,
objectFolder string,
avroSchema *model.QRecordAvroSchemaDefinition,
Expand All @@ -408,7 +410,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(ctx)

numRecords, err := ocfWriter.WriteOCF(ctx, w)
numRecords, err := ocfWriter.WriteOCF(ctx, env, w)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
Expand All @@ -426,7 +428,7 @@ func (s *QRepAvroSyncMethod) writeToStage(

avroFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, syncID)
s.connector.logger.Info("writing records to local file", idLog)
avroFile, err = ocfWriter.WriteRecordsToAvroFile(ctx, avroFilePath)
avroFile, err = ocfWriter.WriteRecordsToAvroFile(ctx, env, avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write records to local Avro file: %w", err)
}
Expand Down
9 changes: 6 additions & 3 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
return nil, err
}

if err := c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

Expand All @@ -120,7 +120,10 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return res, nil
}

func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJobName string,
func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
ctx context.Context,
env map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
if len(schemaDeltas) == 0 {
Expand All @@ -133,7 +136,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJ
}

for _, addedColumn := range schemaDelta.AddedColumns {
clickHouseColType, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(protos.DBType_CLICKHOUSE)
clickHouseColType, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(ctx, env, protos.DBType_CLICKHOUSE, addedColumn)
if err != nil {
return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err)
}
Expand Down
38 changes: 10 additions & 28 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"golang.org/x/sync/errgroup"

"github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -81,16 +80,6 @@ func getColName(overrides map[string]string, name string) string {
return name
}

func getClickhouseTypeForNumericColumn(column *protos.FieldDescription) string {
rawPrecision, _ := datatypes.ParseNumericTypmod(column.TypeModifier)
if rawPrecision > datatypes.PeerDBClickHouseMaxPrecision {
return "String"
} else {
precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{})
return fmt.Sprintf("Decimal(%d, %d)", precision, scale)
}
}

func generateCreateTableSQLForNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
Expand Down Expand Up @@ -142,14 +131,10 @@ func generateCreateTableSQLForNormalizedTable(
}

if clickHouseType == "" {
if colType == qvalue.QValueKindNumeric {
clickHouseType = getClickhouseTypeForNumericColumn(column)
} else {
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
return "", fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
}
var err error
clickHouseType, err = colType.ToDWHColumnType(ctx, config.Env, protos.DBType_CLICKHOUSE, column)
if err != nil {
return "", fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
}
}
if (tableSchema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() {
Expand Down Expand Up @@ -368,16 +353,13 @@ func (c *ClickHouseConnector) NormalizeRecords(

colSelector.WriteString(fmt.Sprintf("`%s`,", dstColName))
if clickHouseType == "" {
if colType == qvalue.QValueKindNumeric {
clickHouseType = getClickhouseTypeForNumericColumn(column)
} else {
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
close(queries)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
var err error
clickHouseType, err = colType.ToDWHColumnType(ctx, req.Env, protos.DBType_CLICKHOUSE, column)
if err != nil {
close(queries)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}

if (schema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() {
clickHouseType = fmt.Sprintf("Nullable(%s)", clickHouseType)
}
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords(
s.logger.Info("sync function called and schema acquired",
slog.String("dstTable", dstTableName))

avroSchema, err := s.getAvroSchema(dstTableName, schema)
avroSchema, err := s.getAvroSchema(ctx, env, dstTableName, schema)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
stagingPath := s.credsProvider.BucketPath
startTime := time.Now()

avroSchema, err := s.getAvroSchema(dstTableName, stream.Schema())
avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, stream.Schema())
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -165,10 +165,12 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
}

func (s *ClickHouseAvroSyncMethod) getAvroSchema(
ctx context.Context,
env map[string]string,
dstTableName string,
schema qvalue.QRecordSchema,
) (*model.QRecordAvroSchemaDefinition, error) {
avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, protos.DBType_CLICKHOUSE)
avroSchema, err := model.GetAvroSchemaDefinition(ctx, env, dstTableName, schema, protos.DBType_CLICKHOUSE)
if err != nil {
return nil, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ type CDCSyncConnectorCore interface {
// ReplayTableSchemaDelta changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
// Connectors which are non-normalizing should implement this as a nop.
ReplayTableSchemaDeltas(ctx context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error
ReplayTableSchemaDeltas(ctx context.Context, env map[string]string, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error
}

type CDCSyncConnector interface {
Expand Down Expand Up @@ -463,8 +463,6 @@ var (
_ CDCSyncConnector = &connclickhouse.ClickHouseConnector{}
_ CDCSyncConnector = &connelasticsearch.ElasticsearchConnector{}

_ CDCSyncPgConnector = &connpostgres.PostgresConnector{}

_ CDCNormalizeConnector = &connpostgres.PostgresConnector{}
_ CDCNormalizeConnector = &connbigquery.BigQueryConnector{}
_ CDCNormalizeConnector = &connsnowflake.SnowflakeConnector{}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (esc *ElasticsearchConnector) CreateRawTable(ctx context.Context,
}

// we handle schema changes by not handling them since no mapping is being enforced right now
func (esc *ElasticsearchConnector) ReplayTableSchemaDeltas(ctx context.Context,
func (esc *ElasticsearchConnector) ReplayTableSchemaDeltas(ctx context.Context, env map[string]string,
flowJobName string, schemaDeltas []*protos.TableSchemaDelta,
) error {
return nil
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,9 @@ func (c *EventHubConnector) CreateRawTable(ctx context.Context, req *protos.Crea
}, nil
}

func (c *EventHubConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error {
func (c *EventHubConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string,
flowJobName string, schemaDeltas []*protos.TableSchemaDelta,
) error {
c.logger.Info("ReplayTableSchemaDeltas for event hub is a no-op")
return nil
}
4 changes: 3 additions & 1 deletion flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ func (c *KafkaConnector) SetupMetadataTables(_ context.Context) error {
return nil
}

func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error {
func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string,
flowJobName string, schemaDeltas []*protos.TableSchemaDelta,
) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func syncRecordsCore[Items model.Items](
return nil, err
}

err = c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
err = c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}
Expand Down Expand Up @@ -941,6 +941,7 @@ func (c *PostgresConnector) SetupNormalizedTable(
// This could involve adding or dropping multiple columns.
func (c *PostgresConnector) ReplayTableSchemaDeltas(
ctx context.Context,
_ map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
require.NoError(s.t, err)

err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{
err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
DstTableName: tableName,
AddedColumns: []*protos.FieldDescription{
Expand Down Expand Up @@ -113,7 +113,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
}
}

err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{
err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
DstTableName: tableName,
AddedColumns: addedColumns,
Expand Down Expand Up @@ -144,7 +144,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
}
}

err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{
err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
DstTableName: tableName,
AddedColumns: addedColumns,
Expand Down Expand Up @@ -175,7 +175,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
}
}

err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{
err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
DstTableName: tableName,
AddedColumns: addedColumns,
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func (c *PubSubConnector) CreateRawTable(ctx context.Context, req *protos.Create
return &protos.CreateRawTableOutput{TableIdentifier: "n/a"}, nil
}

func (c *PubSubConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error {
func (c *PubSubConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string,
flowJobName string, schemaDeltas []*protos.TableSchemaDelta,
) error {
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (c *S3Connector) SyncQRepRecords(
schema := stream.Schema()

dstTableName := config.DestinationTableIdentifier
avroSchema, err := getAvroSchema(dstTableName, schema)
avroSchema, err := getAvroSchema(ctx, config.Env, dstTableName, schema)
if err != nil {
return 0, err
}
Expand All @@ -34,10 +34,12 @@ func (c *S3Connector) SyncQRepRecords(
}

func getAvroSchema(
ctx context.Context,
env map[string]string,
dstTableName string,
schema qvalue.QRecordSchema,
) (*model.QRecordAvroSchemaDefinition, error) {
avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, protos.DBType_S3)
avroSchema, err := model.GetAvroSchemaDefinition(ctx, env, dstTableName, schema, protos.DBType_S3)
if err != nil {
return nil, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ func (c *S3Connector) SyncRecords(ctx context.Context, req *model.SyncRecordsReq
}, nil
}

func (c *S3Connector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error {
func (c *S3Connector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string,
flowJobName string, schemaDeltas []*protos.TableSchemaDelta,
) error {
c.logger.Info("ReplayTableSchemaDeltas for S3 is a no-op")
return nil
}
Loading
Loading