Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jun 3, 2024
2 parents 4f19070 + 018918c commit bbb3d23
Show file tree
Hide file tree
Showing 16 changed files with 286 additions and 135 deletions.
5 changes: 5 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context,
partitions *protos.QRepPartitionBatch,
runUUID string,
) error {
shutdown := heartbeatRoutine(ctx, func() string {
return "replicating partitions for job"
})
defer shutdown()

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))
err := monitoring.UpdateStartTimeForQRepRun(ctx, a.CatalogPool, runUUID)
Expand Down
25 changes: 12 additions & 13 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"log/slog"
"strings"

_ "github.com/ClickHouse/clickhouse-go/v2"
_ "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand Down Expand Up @@ -66,10 +65,18 @@ func (c *ClickhouseConnector) CreateRawTable(ctx context.Context, req *protos.Cr
}, nil
}

func (c *ClickhouseConnector) avroSyncMethod(flowJobName string) *ClickhouseAvroSyncMethod {
qrepConfig := &protos.QRepConfig{
StagingPath: c.credsProvider.BucketPath,
FlowJobName: flowJobName,
DestinationTableIdentifier: c.getRawTableName(flowJobName),
}
return NewClickhouseAvroSyncMethod(qrepConfig, c)
}

func (c *ClickhouseConnector) syncRecordsViaAvro(
ctx context.Context,
req *model.SyncRecordsRequest[model.RecordItems],
rawTableIdentifier string,
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
Expand All @@ -79,13 +86,8 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err)
}

qrepConfig := &protos.QRepConfig{
StagingPath: c.credsProvider.BucketPath,
FlowJobName: req.FlowJobName,
DestinationTableIdentifier: strings.ToLower(rawTableIdentifier),
}
avroSyncer := NewClickhouseAvroSyncMethod(qrepConfig, c)
numRecords, err := avroSyncer.SyncRecords(ctx, stream, req.FlowJobName)
avroSyncer := c.avroSyncMethod(req.FlowJobName)
numRecords, err := avroSyncer.SyncRecords(ctx, stream, req.FlowJobName, syncBatchID)
if err != nil {
return nil, err
}
Expand All @@ -105,10 +107,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
}

func (c *ClickhouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
rawTableName := c.getRawTableName(req.FlowJobName)
c.logger.Info("pushing records to Clickhouse table " + rawTableName)

res, err := c.syncRecordsViaAvro(ctx, req, rawTableName, req.SyncBatchID)
res, err := c.syncRecordsViaAvro(ctx, req, req.SyncBatchID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ClickhouseConnector struct {
logger log.Logger
config *protos.ClickhouseConfig
credsProvider *utils.ClickHouseS3Credentials
s3Stage *ClickHouseS3Stage
}

func ValidateS3(ctx context.Context, creds *utils.ClickHouseS3Credentials) error {
Expand Down Expand Up @@ -160,6 +161,7 @@ func NewClickhouseConnector(
config: config,
logger: logger,
credsProvider: &clickHouseS3CredentialsNew,
s3Stage: NewClickHouseS3Stage(),
}, nil
}

Expand Down
32 changes: 32 additions & 0 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ func (c *ClickhouseConnector) NormalizeRecords(ctx context.Context, req *model.N
}, nil
}

err = c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err)
}

destinationTableNames, err := c.getDistinctTableNamesInBatch(
ctx,
req.FlowJobName,
Expand Down Expand Up @@ -282,3 +287,30 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch(

return tableNames, nil
}

func (c *ClickhouseConnector) copyAvroStageToDestination(ctx context.Context, flowJobName string, syncBatchID int64) error {
avroSynvMethod := c.avroSyncMethod(flowJobName)
avroFile, err := c.s3Stage.GetAvroStage(ctx, flowJobName, syncBatchID)
if err != nil {
return fmt.Errorf("failed to get avro stage: %w", err)
}
defer avroFile.Cleanup()

err = avroSynvMethod.CopyStageToDestination(ctx, avroFile)
if err != nil {
return fmt.Errorf("failed to copy stage to destination: %w", err)
}
return nil
}

func (c *ClickhouseConnector) copyAvroStagesToDestination(
ctx context.Context, flowJobName string, normBatchID, syncBatchID int64,
) error {
for s := normBatchID + 1; s <= syncBatchID; s++ {
err := c.copyAvroStageToDestination(ctx, flowJobName, s)
if err != nil {
return fmt.Errorf("failed to copy avro stage to destination: %w", err)
}
}
return nil
}
6 changes: 3 additions & 3 deletions flow/connectors/clickhouse/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords(
ctx context.Context,
stream *model.QRecordStream,
flowJobName string,
syncBatchID int64,
) (int, error) {
tableLog := slog.String("destinationTable", s.config.DestinationTableIdentifier)
dstTableName := s.config.DestinationTableIdentifier
Expand All @@ -82,11 +83,10 @@ func (s *ClickhouseAvroSyncMethod) SyncRecords(
return 0, err
}

defer avroFile.Cleanup()
s.connector.logger.Info(fmt.Sprintf("written %d records to Avro file", avroFile.NumRecords), tableLog)
err = s.CopyStageToDestination(ctx, avroFile)
err = s.connector.s3Stage.SetAvroStage(ctx, flowJobName, syncBatchID, avroFile)
if err != nil {
return 0, err
return 0, fmt.Errorf("failed to set avro stage: %w", err)
}

return avroFile.NumRecords, nil
Expand Down
83 changes: 83 additions & 0 deletions flow/connectors/clickhouse/s3_stage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package connclickhouse

import (
"context"
"encoding/json"
"fmt"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

utils "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)

type ClickHouseS3Stage struct{}

func NewClickHouseS3Stage() *ClickHouseS3Stage {
return &ClickHouseS3Stage{}
}

func (c *ClickHouseS3Stage) SetAvroStage(
ctx context.Context,
flowJobName string,
syncBatchID int64,
avroFile *utils.AvroFile,
) error {
avroFileJSON, err := json.Marshal(avroFile)
if err != nil {
return fmt.Errorf("failed to marshal avro file: %w", err)
}

conn, err := c.getConn(ctx)
if err != nil {
return fmt.Errorf("failed to get connection: %w", err)
}

_, err = conn.Exec(ctx, `
INSERT INTO ch_s3_stage (flow_job_name, sync_batch_id, avro_file)
VALUES ($1, $2, $3)
ON CONFLICT (flow_job_name, sync_batch_id)
DO UPDATE SET avro_file = $3, created_at = CURRENT_TIMESTAMP
`, flowJobName, syncBatchID, avroFileJSON)
if err != nil {
return fmt.Errorf("failed to set avro stage: %w", err)
}

return nil
}

func (c *ClickHouseS3Stage) GetAvroStage(ctx context.Context, flowJobName string, syncBatchID int64) (*utils.AvroFile, error) {
conn, err := c.getConn(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get connection: %w", err)
}

var avroFileJSON []byte
err = conn.QueryRow(ctx, `
SELECT avro_file FROM ch_s3_stage
WHERE flow_job_name = $1 AND sync_batch_id = $2
`, flowJobName, syncBatchID).Scan(&avroFileJSON)
if err != nil {
if err == pgx.ErrNoRows {
return nil, fmt.Errorf("no avro stage found for flow job %s and sync batch %d", flowJobName, syncBatchID)
}
return nil, fmt.Errorf("failed to get avro stage: %w", err)
}

var avroFile utils.AvroFile
if err := json.Unmarshal(avroFileJSON, &avroFile); err != nil {
return nil, fmt.Errorf("failed to unmarshal avro file: %w", err)
}

return &avroFile, nil
}

func (c *ClickHouseS3Stage) getConn(ctx context.Context) (*pgxpool.Pool, error) {
conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
return nil, fmt.Errorf("unable to create catalog connection pool: %w", err)
}

return conn, nil
}
6 changes: 6 additions & 0 deletions flow/connectors/snowflake/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (c *SnowflakeConnector) SyncQRepRecords(
partition *protos.QRepPartition,
stream *model.QRecordStream,
) (int, error) {
ctx = c.withMirrorNameQueryTag(ctx, config.FlowJobName)

// Ensure the destination table is available.
destTable := config.DestinationTableIdentifier
flowLog := slog.Group("sync_metadata",
Expand Down Expand Up @@ -71,6 +73,8 @@ func (c *SnowflakeConnector) getTableSchema(ctx context.Context, tableName strin
}

func (c *SnowflakeConnector) SetupQRepMetadataTables(ctx context.Context, config *protos.QRepConfig) error {
ctx = c.withMirrorNameQueryTag(ctx, config.FlowJobName)

var schemaExists sql.NullBool
err := c.database.QueryRowContext(ctx, checkIfSchemaExistsSQL, c.rawSchema).Scan(&schemaExists)
if err != nil {
Expand Down Expand Up @@ -169,6 +173,8 @@ func (c *SnowflakeConnector) createExternalStage(ctx context.Context, stageName
}

func (c *SnowflakeConnector) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig) error {
ctx = c.withMirrorNameQueryTag(ctx, config.FlowJobName)

destTable := config.DestinationTableIdentifier
stageName := c.getStageNameForJob(config.FlowJobName)

Expand Down
10 changes: 10 additions & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,13 @@ func (c *SnowflakeConnector) ReplayTableSchemaDeltas(
return nil
}

func (c *SnowflakeConnector) withMirrorNameQueryTag(ctx context.Context, mirrorName string) context.Context {
return gosnowflake.WithQueryTag(ctx, "peerdb-mirror-"+mirrorName)
}

func (c *SnowflakeConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) {
ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName)

rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)
c.logger.Info("pushing records to Snowflake table " + rawTableIdentifier)

Expand Down Expand Up @@ -468,6 +474,7 @@ func (c *SnowflakeConnector) syncRecordsViaAvro(

// NormalizeRecords normalizes raw table to destination table.
func (c *SnowflakeConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (*model.NormalizeResponse, error) {
ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName)
normBatchID, err := c.GetLastNormalizeBatchID(ctx, req.FlowJobName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -583,6 +590,8 @@ func (c *SnowflakeConnector) mergeTablesForBatch(
}

func (c *SnowflakeConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
ctx = c.withMirrorNameQueryTag(ctx, req.FlowJobName)

var schemaExists sql.NullBool
err := c.database.QueryRowContext(ctx, checkIfSchemaExistsSQL, c.rawSchema).Scan(&schemaExists)
if err != nil {
Expand Down Expand Up @@ -625,6 +634,7 @@ func (c *SnowflakeConnector) CreateRawTable(ctx context.Context, req *protos.Cre
}

func (c *SnowflakeConnector) SyncFlowCleanup(ctx context.Context, jobName string) error {
ctx = c.withMirrorNameQueryTag(ctx, jobName)
err := c.PostgresMetadata.SyncFlowCleanup(ctx, jobName)
if err != nil {
return fmt.Errorf("[snowflake drop mirror] unable to clear metadata for sync flow cleanup: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ type peerDBOCFWriter struct {
}

type AvroFile struct {
FilePath string
StorageLocation AvroStorageLocation
NumRecords int
FilePath string `json:"filePath"`
StorageLocation AvroStorageLocation `json:"storageLocation"`
NumRecords int `json:"numRecords"`
}

func (l *AvroFile) Cleanup() {
Expand Down
22 changes: 11 additions & 11 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.2
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.1
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0
github.com/ClickHouse/clickhouse-go/v2 v2.24.0
github.com/ClickHouse/clickhouse-go/v2 v2.25.0
github.com/PeerDB-io/glua64 v1.0.1
github.com/PeerDB-io/gluabit32 v1.0.2
github.com/PeerDB-io/gluaflatbuffers v1.0.1
Expand Down Expand Up @@ -38,11 +38,11 @@ require (
github.com/klauspost/compress v1.17.8
github.com/lib/pq v1.10.9
github.com/linkedin/goavro/v2 v2.13.0
github.com/microsoft/go-mssqldb v1.7.1
github.com/microsoft/go-mssqldb v1.7.2
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/shopspring/decimal v1.4.0
github.com/slack-go/slack v0.13.0
github.com/snowflakedb/gosnowflake v1.10.0
github.com/snowflakedb/gosnowflake v1.10.1
github.com/stretchr/testify v1.9.0
github.com/twmb/franz-go v1.17.0
github.com/twmb/franz-go/plugin/kslog v1.0.0
Expand All @@ -62,14 +62,14 @@ require (
golang.org/x/crypto v0.23.0
golang.org/x/mod v0.17.0
golang.org/x/sync v0.7.0
google.golang.org/api v0.181.0
google.golang.org/genproto/googleapis/api v0.0.0-20240521202816-d264139d666e
google.golang.org/api v0.182.0
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
)

require (
cloud.google.com/go/auth v0.4.2 // indirect
cloud.google.com/go/auth v0.5.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
Expand All @@ -85,7 +85,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/errors v1.11.2 // indirect
github.com/cockroachdb/errors v1.11.3 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
Expand All @@ -110,7 +110,7 @@ require (
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/procfs v0.15.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand Down Expand Up @@ -174,15 +174,15 @@ require (
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/exp v0.0.0-20240525044651-4c93da0ed11d
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.21.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto v0.0.0-20240521202816-d264139d666e // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e // indirect
google.golang.org/genproto v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit bbb3d23

Please sign in to comment.