Skip to content

Commit

Permalink
Merge branch 'main' into ch-experimental-json
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Dec 3, 2024
2 parents 9bf6756 + 1f07e7b commit 5d2d6ff
Show file tree
Hide file tree
Showing 39 changed files with 434 additions and 211 deletions.
3 changes: 2 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}
defer connectors.CloseConnector(ctx, dstConn)

if err := dstConn.ReplayTableSchemaDeltas(ctx, flowName, recordBatchSync.SchemaDeltas); err != nil {
if err := dstConn.ReplayTableSchemaDeltas(ctx, config.Env, flowName, recordBatchSync.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema: %w", err)
}

Expand Down Expand Up @@ -440,6 +440,7 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
})

errGroup.Go(func() error {
var err error
rowsSynced, err = syncRecords(dstConn, errCtx, config, partition, outstream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (c *BigQueryConnector) waitForTableReady(ctx context.Context, datasetTable
// This could involve adding or dropping multiple columns.
func (c *BigQueryConnector) ReplayTableSchemaDeltas(
ctx context.Context,
env map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *BigQueryConnector) SyncQRepRecords(
partition.PartitionId, destTable))

avroSync := NewQRepAvroSyncMethod(c, config.StagingPath, config.FlowJobName)
return avroSync.SyncQRepRecords(ctx, config.FlowJobName, destTable, partition,
return avroSync.SyncQRepRecords(ctx, config.Env, config.FlowJobName, destTable, partition,
tblMetadata, stream, config.SyncedAtColName, config.SoftDeleteColName)
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(
}
}

err = c.ReplayTableSchemaDeltas(ctx, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta})
err = c.ReplayTableSchemaDeltas(ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta})
if err != nil {
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
}
Expand Down
12 changes: 7 additions & 5 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
}

stagingTable := fmt.Sprintf("%s_%s_staging", rawTableName, strconv.FormatInt(syncBatchID, 10))
numRecords, err := s.writeToStage(ctx, strconv.FormatInt(syncBatchID, 10), rawTableName, avroSchema,
numRecords, err := s.writeToStage(ctx, req.Env, strconv.FormatInt(syncBatchID, 10), rawTableName, avroSchema,
&datasetTable{
project: s.connector.projectID,
dataset: s.connector.datasetID,
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s *QRepAvroSyncMethod) SyncRecords(
slog.String(string(shared.FlowNameKey), req.FlowJobName),
slog.String("dstTableName", rawTableName))

err = s.connector.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
err = s.connector.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}
Expand Down Expand Up @@ -139,6 +139,7 @@ func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softD

func (s *QRepAvroSyncMethod) SyncQRepRecords(
ctx context.Context,
env map[string]string,
flowJobName string,
dstTableName string,
partition *protos.QRepPartition,
Expand Down Expand Up @@ -167,7 +168,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
table: fmt.Sprintf("%s_%s_staging", dstDatasetTable.table,
strings.ReplaceAll(partition.PartitionId, "-", "_")),
}
numRecords, err := s.writeToStage(ctx, partition.PartitionId, flowJobName, avroSchema,
numRecords, err := s.writeToStage(ctx, env, partition.PartitionId, flowJobName, avroSchema,
stagingDatasetTable, stream, flowJobName)
if err != nil {
return -1, fmt.Errorf("failed to push to avro stage: %w", err)
Expand Down Expand Up @@ -389,6 +390,7 @@ func GetAvroField(bqField *bigquery.FieldSchema) (AvroField, error) {

func (s *QRepAvroSyncMethod) writeToStage(
ctx context.Context,
env map[string]string,
syncID string,
objectFolder string,
avroSchema *model.QRecordAvroSchemaDefinition,
Expand All @@ -408,7 +410,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(ctx)

numRecords, err := ocfWriter.WriteOCF(ctx, w)
numRecords, err := ocfWriter.WriteOCF(ctx, env, w)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
Expand All @@ -426,7 +428,7 @@ func (s *QRepAvroSyncMethod) writeToStage(

avroFilePath := fmt.Sprintf("%s/%s.avro", tmpDir, syncID)
s.connector.logger.Info("writing records to local file", idLog)
avroFile, err = ocfWriter.WriteRecordsToAvroFile(ctx, avroFilePath)
avroFile, err = ocfWriter.WriteRecordsToAvroFile(ctx, env, avroFilePath)
if err != nil {
return 0, fmt.Errorf("failed to write records to local Avro file: %w", err)
}
Expand Down
9 changes: 6 additions & 3 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
return nil, err
}

if err := c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
if err := c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas); err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}

Expand All @@ -130,7 +130,10 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return res, nil
}

func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJobName string,
func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
ctx context.Context,
env map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
if len(schemaDeltas) == 0 {
Expand All @@ -143,7 +146,7 @@ func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJ
}

for _, addedColumn := range schemaDelta.AddedColumns {
clickHouseColType, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(protos.DBType_CLICKHOUSE)
clickHouseColType, err := qvalue.QValueKind(addedColumn.Type).ToDWHColumnType(ctx, env, protos.DBType_CLICKHOUSE, addedColumn)
if err != nil {
return fmt.Errorf("failed to convert column type %s to ClickHouse type: %w", addedColumn.Type, err)
}
Expand Down
38 changes: 10 additions & 28 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"golang.org/x/sync/errgroup"

"github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -81,16 +80,6 @@ func getColName(overrides map[string]string, name string) string {
return name
}

func getClickhouseTypeForNumericColumn(column *protos.FieldDescription) string {
rawPrecision, _ := datatypes.ParseNumericTypmod(column.TypeModifier)
if rawPrecision > datatypes.PeerDBClickHouseMaxPrecision {
return "String"
} else {
precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{})
return fmt.Sprintf("Decimal(%d, %d)", precision, scale)
}
}

func generateCreateTableSQLForNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
Expand Down Expand Up @@ -142,14 +131,10 @@ func generateCreateTableSQLForNormalizedTable(
}

if clickHouseType == "" {
if colType == qvalue.QValueKindNumeric {
clickHouseType = getClickhouseTypeForNumericColumn(column)
} else {
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
return "", fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
}
var err error
clickHouseType, err = colType.ToDWHColumnType(ctx, config.Env, protos.DBType_CLICKHOUSE, column)
if err != nil {
return "", fmt.Errorf("error while converting column type to ClickHouse type: %w", err)
}
}
if (tableSchema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() {
Expand Down Expand Up @@ -374,16 +359,13 @@ func (c *ClickHouseConnector) NormalizeRecords(

colSelector.WriteString(fmt.Sprintf("`%s`,", dstColName))
if clickHouseType == "" {
if colType == qvalue.QValueKindNumeric {
clickHouseType = getClickhouseTypeForNumericColumn(column)
} else {
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
close(queries)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
var err error
clickHouseType, err = colType.ToDWHColumnType(ctx, req.Env, protos.DBType_CLICKHOUSE, column)
if err != nil {
close(queries)
return nil, fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}

if (schema.NullableEnabled || columnNullableEnabled) && column.Nullable && !colType.IsArray() {
clickHouseType = fmt.Sprintf("Nullable(%s)", clickHouseType)
}
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords(
s.logger.Info("sync function called and schema acquired",
slog.String("dstTable", dstTableName))

avroSchema, err := s.getAvroSchema(dstTableName, schema)
avroSchema, err := s.getAvroSchema(ctx, env, dstTableName, schema)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
stagingPath := s.credsProvider.BucketPath
startTime := time.Now()

avroSchema, err := s.getAvroSchema(dstTableName, stream.Schema())
avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, stream.Schema())
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -168,10 +168,12 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
}

func (s *ClickHouseAvroSyncMethod) getAvroSchema(
ctx context.Context,
env map[string]string,
dstTableName string,
schema qvalue.QRecordSchema,
) (*model.QRecordAvroSchemaDefinition, error) {
avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, protos.DBType_CLICKHOUSE)
avroSchema, err := model.GetAvroSchemaDefinition(ctx, env, dstTableName, schema, protos.DBType_CLICKHOUSE)
if err != nil {
return nil, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down
4 changes: 1 addition & 3 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ type CDCSyncConnectorCore interface {
// ReplayTableSchemaDelta changes a destination table to match the schema at source
// This could involve adding or dropping multiple columns.
// Connectors which are non-normalizing should implement this as a nop.
ReplayTableSchemaDeltas(ctx context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error
ReplayTableSchemaDeltas(ctx context.Context, env map[string]string, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error
}

type CDCSyncConnector interface {
Expand Down Expand Up @@ -463,8 +463,6 @@ var (
_ CDCSyncConnector = &connclickhouse.ClickHouseConnector{}
_ CDCSyncConnector = &connelasticsearch.ElasticsearchConnector{}

_ CDCSyncPgConnector = &connpostgres.PostgresConnector{}

_ CDCNormalizeConnector = &connpostgres.PostgresConnector{}
_ CDCNormalizeConnector = &connbigquery.BigQueryConnector{}
_ CDCNormalizeConnector = &connsnowflake.SnowflakeConnector{}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (esc *ElasticsearchConnector) CreateRawTable(ctx context.Context,
}

// we handle schema changes by not handling them since no mapping is being enforced right now
func (esc *ElasticsearchConnector) ReplayTableSchemaDeltas(ctx context.Context,
func (esc *ElasticsearchConnector) ReplayTableSchemaDeltas(ctx context.Context, env map[string]string,
flowJobName string, schemaDeltas []*protos.TableSchemaDelta,
) error {
return nil
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,9 @@ func (c *EventHubConnector) CreateRawTable(ctx context.Context, req *protos.Crea
}, nil
}

func (c *EventHubConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error {
func (c *EventHubConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string,
flowJobName string, schemaDeltas []*protos.TableSchemaDelta,
) error {
c.logger.Info("ReplayTableSchemaDeltas for event hub is a no-op")
return nil
}
4 changes: 3 additions & 1 deletion flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ func (c *KafkaConnector) SetupMetadataTables(_ context.Context) error {
return nil
}

func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error {
func (c *KafkaConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string,
flowJobName string, schemaDeltas []*protos.TableSchemaDelta,
) error {
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func syncRecordsCore[Items model.Items](
return nil, err
}

err = c.ReplayTableSchemaDeltas(ctx, req.FlowJobName, req.Records.SchemaDeltas)
err = c.ReplayTableSchemaDeltas(ctx, req.Env, req.FlowJobName, req.Records.SchemaDeltas)
if err != nil {
return nil, fmt.Errorf("failed to sync schema changes: %w", err)
}
Expand Down Expand Up @@ -941,6 +941,7 @@ func (c *PostgresConnector) SetupNormalizedTable(
// This could involve adding or dropping multiple columns.
func (c *PostgresConnector) ReplayTableSchemaDeltas(
ctx context.Context,
_ map[string]string,
flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/postgres/postgres_schema_delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s PostgresSchemaDeltaTestSuite) TestSimpleAddColumn() {
fmt.Sprintf("CREATE TABLE %s(id INT PRIMARY KEY)", tableName))
require.NoError(s.t, err)

err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{
err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
DstTableName: tableName,
AddedColumns: []*protos.FieldDescription{
Expand Down Expand Up @@ -113,7 +113,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddAllColumnTypes() {
}
}

err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{
err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
DstTableName: tableName,
AddedColumns: addedColumns,
Expand Down Expand Up @@ -144,7 +144,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddTrickyColumnNames() {
}
}

err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{
err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
DstTableName: tableName,
AddedColumns: addedColumns,
Expand Down Expand Up @@ -175,7 +175,7 @@ func (s PostgresSchemaDeltaTestSuite) TestAddDropWhitespaceColumnNames() {
}
}

err = s.connector.ReplayTableSchemaDeltas(context.Background(), "schema_delta_flow", []*protos.TableSchemaDelta{{
err = s.connector.ReplayTableSchemaDeltas(context.Background(), nil, "schema_delta_flow", []*protos.TableSchemaDelta{{
SrcTableName: tableName,
DstTableName: tableName,
AddedColumns: addedColumns,
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func (c *PubSubConnector) CreateRawTable(ctx context.Context, req *protos.Create
return &protos.CreateRawTableOutput{TableIdentifier: "n/a"}, nil
}

func (c *PubSubConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error {
func (c *PubSubConnector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string,
flowJobName string, schemaDeltas []*protos.TableSchemaDelta,
) error {
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/s3/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (c *S3Connector) SyncQRepRecords(
schema := stream.Schema()

dstTableName := config.DestinationTableIdentifier
avroSchema, err := getAvroSchema(dstTableName, schema)
avroSchema, err := getAvroSchema(ctx, config.Env, dstTableName, schema)
if err != nil {
return 0, err
}
Expand All @@ -34,10 +34,12 @@ func (c *S3Connector) SyncQRepRecords(
}

func getAvroSchema(
ctx context.Context,
env map[string]string,
dstTableName string,
schema qvalue.QRecordSchema,
) (*model.QRecordAvroSchemaDefinition, error) {
avroSchema, err := model.GetAvroSchemaDefinition(dstTableName, schema, protos.DBType_S3)
avroSchema, err := model.GetAvroSchemaDefinition(ctx, env, dstTableName, schema, protos.DBType_S3)
if err != nil {
return nil, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ func (c *S3Connector) SyncRecords(ctx context.Context, req *model.SyncRecordsReq
}, nil
}

func (c *S3Connector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string, schemaDeltas []*protos.TableSchemaDelta) error {
func (c *S3Connector) ReplayTableSchemaDeltas(_ context.Context, _ map[string]string,
flowJobName string, schemaDeltas []*protos.TableSchemaDelta,
) error {
c.logger.Info("ReplayTableSchemaDeltas for S3 is a no-op")
return nil
}
Loading

0 comments on commit 5d2d6ff

Please sign in to comment.