Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jul 15, 2024
2 parents 09c1769 + 514e0f6 commit f499201
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 324 deletions.
8 changes: 6 additions & 2 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (h *FlowRequestHandler) getPartitionStatuses(
ctx context.Context,
flowJobName string,
) ([]*protos.PartitionStatus, error) {
q := "SELECT start_time,end_time,rows_in_partition,rows_synced FROM peerdb_stats.qrep_partitions WHERE flow_name=$1"
q := "SELECT partition_uuid,start_time,end_time,rows_in_partition,rows_synced FROM peerdb_stats.qrep_partitions WHERE flow_name=$1"
rows, err := h.pool.Query(ctx, q, flowJobName)
if err != nil {
slog.Error(fmt.Sprintf("unable to query qrep partition - %s: %s", flowJobName, err.Error()))
Expand All @@ -360,19 +360,23 @@ func (h *FlowRequestHandler) getPartitionStatuses(
defer rows.Close()

res := []*protos.PartitionStatus{}
var partitionId pgtype.Text
var startTime pgtype.Timestamp
var endTime pgtype.Timestamp
var numRowsInPartition pgtype.Int8
var numRowsSynced pgtype.Int8

for rows.Next() {
if err := rows.Scan(&startTime, &endTime, &numRowsInPartition, &numRowsSynced); err != nil {
if err := rows.Scan(&partitionId, &startTime, &endTime, &numRowsInPartition, &numRowsSynced); err != nil {
slog.Error(fmt.Sprintf("unable to scan qrep partition - %s: %s", flowJobName, err.Error()))
return nil, fmt.Errorf("unable to scan qrep partition - %s: %w", flowJobName, err)
}

partitionStatus := &protos.PartitionStatus{}

if partitionId.Valid {
partitionStatus.PartitionId = partitionId.String
}
if startTime.Valid {
partitionStatus.StartTime = timestamppb.New(startTime.Time)
}
Expand Down
5 changes: 3 additions & 2 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ func (c *ClickhouseConnector) RenameTables(ctx context.Context, req *protos.Rena
for _, renameRequest := range req.RenameTableOptions {
if req.SyncedAtColName != "" {
syncedAtCol := strings.ToLower(req.SyncedAtColName)
_, err := c.execWithLogging(ctx, fmt.Sprintf("ALTER TABLE %s UPDATE %s=now() WHERE true",
renameRequest.CurrentName, syncedAtCol))
_, err := c.execWithLogging(ctx,
fmt.Sprintf("ALTER TABLE %s UPDATE %s=now() WHERE true SETTINGS allow_nondeterministic_mutations=1",
renameRequest.CurrentName, syncedAtCol))
if err != nil {
return nil, fmt.Errorf("unable to set synced at column for table %s: %w",
renameRequest.CurrentName, err)
Expand Down
56 changes: 28 additions & 28 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ require (
github.com/PeerDB-io/gluajson v1.0.2
github.com/PeerDB-io/gluamsgpack v1.0.4
github.com/PeerDB-io/gluautf8 v1.0.0
github.com/aws/aws-sdk-go-v2 v1.30.1
github.com/aws/aws-sdk-go-v2/config v1.27.24
github.com/aws/aws-sdk-go-v2/credentials v1.17.24
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.5
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.0
github.com/aws/aws-sdk-go-v2/service/ses v1.24.1
github.com/aws/aws-sdk-go-v2/service/sns v1.31.1
github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2/config v1.27.26
github.com/aws/aws-sdk-go-v2/credentials v1.17.26
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.7
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2
github.com/aws/aws-sdk-go-v2/service/ses v1.25.2
github.com/aws/aws-sdk-go-v2/service/sns v1.31.3
github.com/aws/smithy-go v1.20.3
github.com/cockroachdb/pebble v1.1.1
github.com/elastic/go-elasticsearch/v8 v8.14.0
Expand Down Expand Up @@ -57,32 +57,32 @@ require (
go.opentelemetry.io/otel/metric v1.28.0
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/sdk/metric v1.28.0
go.temporal.io/api v1.35.0
go.temporal.io/api v1.36.0
go.temporal.io/sdk v1.27.0
go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.25.0
golang.org/x/mod v0.19.0
golang.org/x/sync v0.7.0
google.golang.org/api v0.187.0
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094
google.golang.org/api v0.188.0
google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
)

require (
cloud.google.com/go/auth v0.6.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
cloud.google.com/go/auth v0.7.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.3 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
github.com/ClickHouse/ch-go v0.61.5 // indirect
github.com/ClickHouse/ch-go v0.62.0 // indirect
github.com/DataDog/zstd v1.5.5 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.9 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.22.3 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.30.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand All @@ -91,7 +91,7 @@ require (
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/danieljoos/wincred v1.2.1 // indirect
github.com/danieljoos/wincred v1.2.2 // indirect
github.com/dvsekhvalnov/jose2go v1.7.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand Down Expand Up @@ -126,22 +126,22 @@ require (
)

require (
cloud.google.com/go/compute/metadata v0.4.0 // indirect
cloud.google.com/go/iam v1.1.10 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
cloud.google.com/go/iam v1.1.11 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.12.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.9.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 // indirect
github.com/Azure/go-amqp v1.0.5 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.13 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.15 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.13 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/djherbis/buffer v1.2.0
github.com/djherbis/nio/v3 v3.0.1
Expand Down Expand Up @@ -185,7 +185,7 @@ require (
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.23.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
google.golang.org/genproto v0.0.0-20240711142825-46eb208f015d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit f499201

Please sign in to comment.