Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Apr 8, 2024
2 parents f8758f9 + 55d5b3f commit bde2362
Show file tree
Hide file tree
Showing 20 changed files with 751 additions and 688 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
branches: [main, release/*]
pull_request:
branches: [main, release/*]
paths: [nexus/**, protos/**]

jobs:
build:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ui-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
branches: [main]
pull_request:
branches: [main]
paths: [ui/**]
paths: [ui/**, protos/**]

jobs:
build-test:
Expand Down
1 change: 1 addition & 0 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type APIServerParams struct {

// setupGRPCGatewayServer sets up the grpc-gateway mux
func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
//nolint:staticcheck
conn, err := grpc.DialContext(
context.Background(),
fmt.Sprintf("0.0.0.0:%d", args.Port),
Expand Down
7 changes: 5 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename

columnNames := make([]string, 0, len(renameRequest.TableSchema.Columns))
for _, col := range renameRequest.TableSchema.Columns {
columnNames = append(columnNames, col.Name)
columnNames = append(columnNames, "`"+col.Name+"`")
}

if req.SoftDeleteColName != nil {
Expand All @@ -744,7 +744,10 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
allColsWithoutAlias := strings.Join(columnNames, ",")
allColsWithAlias := allColsBuilder.String()

pkeyCols := renameRequest.TableSchema.PrimaryKeyColumns
pkeyCols := make([]string, 0, len(renameRequest.TableSchema.PrimaryKeyColumns))
for _, pkeyCol := range renameRequest.TableSchema.PrimaryKeyColumns {
pkeyCols = append(pkeyCols, "`"+pkeyCol+"`")
}

c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

Expand Down
50 changes: 49 additions & 1 deletion flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

const (
Expand Down Expand Up @@ -135,8 +136,55 @@ func (c *ClickhouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return res, nil
}

func (c *ClickhouseConnector) ReplayTableSchemaDeltas(_ context.Context, flowJobName string,
func (c *ClickhouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJobName string,
schemaDeltas []*protos.TableSchemaDelta,
) error {
if len(schemaDeltas) == 0 {
return nil
}

tableSchemaModifyTx, err := c.database.Begin()
if err != nil {
return fmt.Errorf("error starting transaction for schema modification: %w",
err)
}
defer func() {
deferErr := tableSchemaModifyTx.Rollback()
if deferErr != sql.ErrTxDone && deferErr != nil {
c.logger.Error("error rolling back transaction for table schema modification", "error", deferErr)
}
}()

for _, schemaDelta := range schemaDeltas {
if schemaDelta == nil || len(schemaDelta.AddedColumns) == 0 {
continue
}

for _, addedColumn := range schemaDelta.AddedColumns {
clickhouseColType, err := qvalue.QValueKind(addedColumn.ColumnType).ToDWHColumnType(protos.DBType_CLICKHOUSE)
if err != nil {
return fmt.Errorf("failed to convert column type %s to clickhouse type: %w",
addedColumn.ColumnType, err)
}
_, err = tableSchemaModifyTx.ExecContext(ctx,
fmt.Sprintf("ALTER TABLE %s ADD COLUMN IF NOT EXISTS \"%s\" %s",
schemaDelta.DstTableName, addedColumn.ColumnName, clickhouseColType))
if err != nil {
return fmt.Errorf("failed to add column %s for table %s: %w", addedColumn.ColumnName,
schemaDelta.DstTableName, err)
}
c.logger.Info(fmt.Sprintf("[schema delta replay] added column %s with data type %s", addedColumn.ColumnName,
addedColumn.ColumnType),
"destination table name", schemaDelta.DstTableName,
"source table name", schemaDelta.SrcTableName)
}
}

err = tableSchemaModifyTx.Commit()
if err != nil {
return fmt.Errorf("failed to commit transaction for table schema modification: %w",
err)
}

return nil
}
9 changes: 7 additions & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,11 +755,16 @@ func (c *SnowflakeConnector) RenameTables(ctx context.Context, req *protos.Renam

columnNames := make([]string, 0, len(renameRequest.TableSchema.Columns))
for _, col := range renameRequest.TableSchema.Columns {
columnNames = append(columnNames, col.Name)
columnNames = append(columnNames, SnowflakeIdentifierNormalize(col.Name))
}

pkeyColumnNames := make([]string, 0, len(renameRequest.TableSchema.PrimaryKeyColumns))
for _, col := range renameRequest.TableSchema.PrimaryKeyColumns {
pkeyColumnNames = append(pkeyColumnNames, SnowflakeIdentifierNormalize(col))
}

allCols := strings.Join(columnNames, ",")
pkeyCols := strings.Join(renameRequest.TableSchema.PrimaryKeyColumns, ",")
pkeyCols := strings.Join(pkeyColumnNames, ",")

c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dst))

Expand Down
104 changes: 52 additions & 52 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,27 @@ module github.com/PeerDB-io/peer-flow
go 1.22.0

require (
cloud.google.com/go v0.112.1
cloud.google.com/go/bigquery v1.59.1
cloud.google.com/go v0.112.2
cloud.google.com/go/bigquery v1.60.0
cloud.google.com/go/pubsub v1.37.0
cloud.google.com/go/storage v1.39.1
cloud.google.com/go/storage v1.40.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.0.4
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0
github.com/ClickHouse/clickhouse-go/v2 v2.22.4
github.com/ClickHouse/clickhouse-go/v2 v2.23.0
github.com/PeerDB-io/glua64 v1.0.1
github.com/PeerDB-io/gluabit32 v1.0.2
github.com/PeerDB-io/gluaflatbuffers v1.0.1
github.com/PeerDB-io/gluajson v1.0.2
github.com/PeerDB-io/gluamsgpack v1.0.2
github.com/PeerDB-io/gluautf8 v1.0.0
github.com/aws/aws-sdk-go-v2 v1.26.0
github.com/aws/aws-sdk-go-v2/config v1.27.9
github.com/aws/aws-sdk-go-v2/credentials v1.17.9
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.13
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.0
github.com/aws/aws-sdk-go-v2/service/ses v1.22.3
github.com/aws/aws-sdk-go-v2/service/sns v1.29.3
github.com/aws/aws-sdk-go-v2 v1.26.1
github.com/aws/aws-sdk-go-v2/config v1.27.11
github.com/aws/aws-sdk-go-v2/credentials v1.17.11
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1
github.com/aws/aws-sdk-go-v2/service/ses v1.22.4
github.com/aws/aws-sdk-go-v2/service/sns v1.29.4
github.com/cockroachdb/pebble v1.1.0
github.com/google/uuid v1.6.0
github.com/grafana/pyroscope-go v1.1.1
Expand All @@ -40,22 +40,22 @@ require (
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/shopspring/decimal v1.3.1
github.com/slack-go/slack v0.12.5
github.com/snowflakedb/gosnowflake v1.8.0
github.com/snowflakedb/gosnowflake v1.9.0
github.com/stretchr/testify v1.9.0
github.com/twmb/franz-go v1.16.1
github.com/twmb/franz-go/plugin/kslog v1.0.0
github.com/twpayne/go-geos v0.17.1
github.com/urfave/cli/v3 v3.0.0-alpha9
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a
github.com/yuin/gopher-lua v1.1.1
go.temporal.io/api v1.30.1
go.temporal.io/api v1.31.0
go.temporal.io/sdk v1.26.0
go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.21.0
golang.org/x/sync v0.6.0
google.golang.org/api v0.171.0
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237
google.golang.org/grpc v1.62.1
golang.org/x/crypto v0.22.0
golang.org/x/sync v0.7.0
google.golang.org/api v0.172.0
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda
google.golang.org/grpc v1.63.0
google.golang.org/protobuf v1.33.0
)

Expand All @@ -66,13 +66,14 @@ require (
github.com/DataDog/zstd v1.5.5 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/apache/arrow/go/v14 v14.0.2 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.0 // indirect
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.3 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.5 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/errors v1.11.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
Expand All @@ -95,40 +96,40 @@ require (
github.com/paulmach/orb v0.11.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
github.com/prometheus/common v0.51.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.52.2 // indirect
github.com/prometheus/procfs v0.13.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/term v0.18.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.50.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.50.0 // indirect
go.opentelemetry.io/otel v1.25.0 // indirect
go.opentelemetry.io/otel/metric v1.25.0 // indirect
go.opentelemetry.io/otel/trace v1.25.0 // indirect
golang.org/x/term v0.19.0 // indirect
)

require (
cloud.google.com/go/compute v1.25.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v1.1.7 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.10.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.1 // indirect
github.com/Azure/go-amqp v1.0.5 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.4 // indirect
github.com/aws/smithy-go v1.20.1
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 // indirect
github.com/aws/smithy-go v1.20.2
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/djherbis/buffer v1.2.0
github.com/djherbis/nio/v3 v3.0.1
Expand All @@ -143,7 +144,7 @@ require (
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v24.3.7+incompatible // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
Expand All @@ -165,17 +166,16 @@ require (
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/exp v0.0.0-20240318143956-a85f2c67cd81
golang.org/x/mod v0.16.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/exp v0.0.0-20240404231335-c0f41cb1a7a0
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.19.0 // indirect
golang.org/x/tools v0.20.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit bde2362

Please sign in to comment.