Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

no peer config stored in temporal state #1844

Merged
merged 34 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3032172
no peer config in flow
serprex Jun 14, 2024
75a1e95
day 1
serprex Jun 15, 2024
b9dc8d4
rest of nexus
serprex Jun 15, 2024
16fa03f
golang lint fixed, now just a bunch of e2e errors...
serprex Jun 15, 2024
230cdbd
Working through ui updates
serprex Jun 16, 2024
59ecf6f
fix golint, probably need more CreatePeer calls in e2e, helpers shoul…
serprex Jun 17, 2024
c5fd5ba
Fix rest of ui error messages
serprex Jun 17, 2024
5e5d77b
clippy
serprex Jun 17, 2024
9d2f9a1
e2e helpers: CreatePeer
serprex Jun 17, 2024
ed7e3c1
Fix MaintainTx type mismatch, remove unused connector getters
serprex Jun 17, 2024
c94f896
TransactionSnapshot moved from peer config to qrep flow config
serprex Jun 18, 2024
3480f80
cleanup getTruePeer
serprex Jun 18, 2024
3f178ed
fix
serprex Jun 18, 2024
2f0de60
cleanup
serprex Jun 18, 2024
c5fdbcd
lint
serprex Jun 18, 2024
8327d95
Move peer generation out of helper
serprex Jun 18, 2024
98effb5
simplify snowflake e2e count methods
serprex Jun 18, 2024
1120038
Pass protos around less
serprex Jun 18, 2024
3f5312f
don't break proto backwards compat. TODO: fallback to deprecated fiel…
serprex Jun 19, 2024
bc6bf8a
pause/unpause upgrade protocol
serprex Jun 19, 2024
9c6286b
Also update mirror config in database
serprex Jun 20, 2024
f8ef5b6
Don't rename fields
serprex Jun 20, 2024
7c9f814
Convert for ui since ui lacks fallback logic & this'll send less cred…
serprex Jun 20, 2024
c30546f
update ui schema
serprex Jun 20, 2024
95594c3
console print debug
serprex Jun 21, 2024
ce323fe
more console, some cleanup
serprex Jun 21, 2024
7e52dc9
remove console.log
serprex Jun 21, 2024
f00dcb3
more lint fix
serprex Jun 21, 2024
348758f
Don't divide xmin from rest of qrep. Don't send whole config down for…
serprex Jun 21, 2024
8d1f911
simplify success & error
serprex Jun 21, 2024
622354a
Merge branch 'main' into no-tempo-peer-config
serprex Jun 22, 2024
9f8e86d
model toMap: encode to binary faster (#1865)
serprex Jun 24, 2024
6b4072d
disable nolintlint
serprex Jun 24, 2024
d4c020b
Merge branch 'main' into no-tempo-peer-config
serprex Jun 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ linters:
- misspell
- musttag
- nakedret
- nolintlint
# TODO bring back in 0.15 - nolintlint
- nonamedreturns
- perfsprint
- prealloc
Expand Down
223 changes: 70 additions & 153 deletions flow/activities/flowable.go

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
})
defer shutdown()

dstConn, err := connectors.GetAs[TSync](ctx, config.Destination)
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to get destination connector: %w", err)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
hasRecords := !recordBatchSync.WaitAndCheckEmpty()
logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords))

dstConn, err = connectors.GetAs[TSync](ctx, config.Destination)
dstConn, err = connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return nil, fmt.Errorf("failed to recreate destination connector: %w", err)
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
var res *model.SyncResponse
errGroup.Go(func() error {
syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName)
if err != nil && config.Destination.Type != protos.DBType_EVENTHUBS {
serprex marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
syncBatchID += 1
Expand Down Expand Up @@ -332,14 +332,14 @@ func replicateQRepPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := log.With(activity.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName))

srcConn, err := connectors.GetAs[TPull](ctx, config.SourcePeer)
srcConn, err := connectors.GetByNameAs[TPull](ctx, a.CatalogPool, config.SourceName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

dstConn, err := connectors.GetAs[TSync](ctx, config.DestinationPeer)
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to get qrep destination connector: %w", err)
Expand Down Expand Up @@ -431,13 +431,13 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn
logger := activity.GetLogger(ctx)

startTime := time.Now()
srcConn, err := connectors.GetAs[*connpostgres.PostgresConnector](ctx, config.SourcePeer)
srcConn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, a.CatalogPool, config.SourceName)
if err != nil {
return 0, fmt.Errorf("failed to get qrep source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

dstConn, err := connectors.GetAs[TSync](ctx, config.DestinationPeer)
dstConn, err := connectors.GetByNameAs[TSync](ctx, a.CatalogPool, config.DestinationName)
if err != nil {
return 0, fmt.Errorf("failed to get qrep destination connector: %w", err)
}
Expand Down
25 changes: 11 additions & 14 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package activities

import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/sdk/activity"

"github.com/PeerDB-io/peer-flow/alerting"
Expand All @@ -29,6 +31,7 @@ type TxSnapshotState struct {

type SnapshotActivity struct {
Alerter *alerting.Alerter
CatalogPool *pgxpool.Pool
SlotSnapshotStates map[string]SlotSnapshotState
TxSnapshotStates map[string]TxSnapshotState
SnapshotStatesMutex sync.Mutex
Expand Down Expand Up @@ -56,16 +59,14 @@ func (a *SnapshotActivity) SetupReplication(
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := activity.GetLogger(ctx)

dbType := config.PeerConnectionConfig.Type
if dbType != protos.DBType_POSTGRES {
logger.Info(fmt.Sprintf("setup replication is no-op for %s", dbType))
return nil, nil
}

a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job")

conn, err := connectors.GetCDCPullConnector(ctx, config.PeerConnectionConfig)
conn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, a.CatalogPool, config.PeerName)
if err != nil {
if errors.Is(err, errors.ErrUnsupported) {
logger.Info("setup replication is no-op for non-postgres source")
return nil, nil
}
return nil, fmt.Errorf("failed to get connector: %w", err)
}

Expand All @@ -80,14 +81,10 @@ func (a *SnapshotActivity) SetupReplication(
connectors.CloseConnector(ctx, conn)
}

// This now happens in a goroutine
go func() {
pgConn := conn.(*connpostgres.PostgresConnector)
err = pgConn.SetupReplication(ctx, slotSignal, config)
if err != nil {
if err := conn.SetupReplication(ctx, slotSignal, config); err != nil {
closeConnectionForError(err)
replicationErr <- err
return
}
}()

Expand Down Expand Up @@ -122,8 +119,8 @@ func (a *SnapshotActivity) SetupReplication(
}, nil
}

func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer *protos.Peer) error {
conn, err := connectors.GetCDCPullConnector(ctx, peer)
func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, peer string) error {
conn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, a.CatalogPool, peer)
if err != nil {
return err
}
Expand Down
30 changes: 15 additions & 15 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peer-flow/alerting"
"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -60,16 +61,16 @@ func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
req *protos.CreateCDCFlowRequest, workflowID string,
) error {
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.Source.Name)
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.SourceName)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
req.ConnectionConfigs.Source.Name, srcErr)
req.ConnectionConfigs.SourceName, srcErr)
}

destinationPeerID, destinationPeerType, dstErr := h.getPeerID(ctx, req.ConnectionConfigs.Destination.Name)
destinationPeerID, destinationPeerType, dstErr := h.getPeerID(ctx, req.ConnectionConfigs.DestinationName)
if dstErr != nil {
return fmt.Errorf("unable to get peer id for target peer %s: %w",
req.ConnectionConfigs.Destination.Name, srcErr)
req.ConnectionConfigs.DestinationName, srcErr)
}

for _, v := range req.ConnectionConfigs.TableMappings {
Expand All @@ -92,14 +93,14 @@ func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
func (h *FlowRequestHandler) createQRepJobEntry(ctx context.Context,
req *protos.CreateQRepFlowRequest, workflowID string,
) error {
sourcePeerName := req.QrepConfig.SourcePeer.Name
sourcePeerName := req.QrepConfig.SourceName
sourcePeerID, _, srcErr := h.getPeerID(ctx, sourcePeerName)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
sourcePeerName, srcErr)
}

destinationPeerName := req.QrepConfig.DestinationPeer.Name
destinationPeerName := req.QrepConfig.DestinationName
destinationPeerID, _, dstErr := h.getPeerID(ctx, destinationPeerName)
if dstErr != nil {
return fmt.Errorf("unable to get peer id for target peer %s: %w",
Expand Down Expand Up @@ -167,10 +168,7 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog(
ctx context.Context,
cfg *protos.FlowConnectionConfigs,
) error {
var cfgBytes []byte
var err error

cfgBytes, err = proto.Marshal(cfg)
cfgBytes, err := proto.Marshal(cfg)
if err != nil {
return fmt.Errorf("unable to marshal flow config: %w", err)
}
Expand Down Expand Up @@ -208,15 +206,18 @@ func (h *FlowRequestHandler) CreateQRepFlow(
},
}
if req.CreateCatalogEntry {
err := h.createQRepJobEntry(ctx, req, workflowID)
if err != nil {
if err := h.createQRepJobEntry(ctx, req, workflowID); err != nil {
slog.Error("unable to create flow job entry",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}
dbtype, err := connectors.LoadPeerType(ctx, h.pool, cfg.SourceName)
if err != nil {
return nil, err
}
var workflowFn interface{}
if cfg.SourcePeer.Type == protos.DBType_POSTGRES && cfg.WatermarkColumn == "xmin" {
if dbtype == protos.DBType_POSTGRES && cfg.WatermarkColumn == "xmin" {
workflowFn = peerflow.XminFlowWorkflow
} else {
workflowFn = peerflow.QRepFlowWorkflow
Expand All @@ -226,8 +227,7 @@ func (h *FlowRequestHandler) CreateQRepFlow(
cfg.SyncedAtColName = "_PEERDB_SYNCED_AT"
}

_, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, nil)
if err != nil {
if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, nil); err != nil {
slog.Error("unable to start QRepFlow workflow",
slog.Any("error", err), slog.String("flowName", cfg.FlowJobName))
return nil, fmt.Errorf("unable to start QRepFlow workflow: %w", err)
Expand Down
43 changes: 30 additions & 13 deletions flow/cmd/mirror_status.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//nolint:staticcheck // TODO remove in 0.15
package cmd

import (
Expand All @@ -10,6 +11,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/PeerDB-io/peer-flow/connectors"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
Expand Down Expand Up @@ -120,27 +122,45 @@ func (h *FlowRequestHandler) CDCFlowStatus(
return nil, err
}

// TODO remove in 0.15
// patching config to use new fields on ui
if config.Source != nil {
config.SourceName = config.Source.Name
config.Source = nil
}
if config.Destination != nil {
config.DestinationName = config.Destination.Name
config.Destination = nil
}

// patching config to show latest values from state
if state.SyncFlowOptions != nil {
config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
config.MaxBatchSize = state.SyncFlowOptions.BatchSize
config.TableMappings = state.SyncFlowOptions.TableMappings
}

var initialCopyStatus *protos.SnapshotStatus

cloneStatuses, err := h.cloneTableSummary(ctx, req.FlowJobName)
srcType, err := connectors.LoadPeerType(ctx, h.pool, config.SourceName)
if err != nil {
return nil, err
}
dstType, err := connectors.LoadPeerType(ctx, h.pool, config.DestinationName)
if err != nil {
return nil, err
}

initialCopyStatus = &protos.SnapshotStatus{
Clones: cloneStatuses,
cloneStatuses, err := h.cloneTableSummary(ctx, req.FlowJobName)
if err != nil {
return nil, err
}

return &protos.CDCMirrorStatus{
Config: config,
SnapshotStatus: initialCopyStatus,
Config: config,
SourceType: srcType,
DestinationType: dstType,
SnapshotStatus: &protos.SnapshotStatus{
Clones: cloneStatuses,
},
}, nil
}

Expand Down Expand Up @@ -316,16 +336,14 @@ func (h *FlowRequestHandler) getFlowConfigFromCatalog(
flowJobName string,
) (*protos.FlowConnectionConfigs, error) {
var configBytes sql.RawBytes
var err error
var config protos.FlowConnectionConfigs

err = h.pool.QueryRow(ctx,
err := h.pool.QueryRow(ctx,
"SELECT config_proto FROM flows WHERE name = $1", flowJobName).Scan(&configBytes)
if err != nil {
slog.Error("unable to query flow config from catalog", slog.Any("error", err))
return nil, fmt.Errorf("unable to query flow config from catalog: %w", err)
}

var config protos.FlowConnectionConfigs
err = proto.Unmarshal(configBytes, &config)
if err != nil {
slog.Error("unable to unmarshal flow config", slog.Any("error", err))
Expand Down Expand Up @@ -386,8 +404,7 @@ func (h *FlowRequestHandler) getCDCWorkflowState(ctx context.Context,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
}
var state peerflow.CDCFlowWorkflowState
err = res.Get(&state)
if err != nil {
if err := res.Get(&state); err != nil {
slog.Error(fmt.Sprintf("failed to get state in workflow with ID %s: %s", workflowID, err.Error()))
return nil,
fmt.Errorf("failed to get state in workflow with ID %s: %w", workflowID, err)
Expand Down
1 change: 1 addition & 0 deletions flow/cmd/snapshot_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func SnapshotWorkerMain(opts *SnapshotWorkerOptions) (client.Client, worker.Work
SlotSnapshotStates: make(map[string]activities.SlotSnapshotState),
TxSnapshotStates: make(map[string]activities.TxSnapshotState),
Alerter: alerting.NewAlerter(context.Background(), conn),
CatalogPool: conn,
})

return c, w, nil
Expand Down
14 changes: 11 additions & 3 deletions flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/jackc/pgx/v5/pgtype"

"github.com/PeerDB-io/peer-flow/connectors"
connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres"
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
Expand Down Expand Up @@ -43,12 +44,19 @@ func (h *FlowRequestHandler) ValidateCDCMirror(
Ok: false,
}, errors.New("connection configs is nil")
}
sourcePeerConfig := req.ConnectionConfigs.Source.GetPostgresConfig()
sourcePeer, err := connectors.LoadPeer(ctx, h.pool, req.ConnectionConfigs.SourceName)
if err != nil {
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, err
}

sourcePeerConfig := sourcePeer.GetPostgresConfig()
if sourcePeerConfig == nil {
slog.Error("/validatecdc source peer config is nil", slog.Any("peer", req.ConnectionConfigs.Source))
slog.Error("/validatecdc source peer config is not postgres", slog.String("peer", req.ConnectionConfigs.SourceName))
return &protos.ValidateCDCMirrorResponse{
Ok: false,
}, errors.New("source peer config is nil")
}, errors.New("source peer config is not postgres")
}

pgPeer, err := connpostgres.NewPostgresConnector(ctx, sourcePeerConfig)
Expand Down
10 changes: 3 additions & 7 deletions flow/cmd/validate_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func (h *FlowRequestHandler) ValidatePeer(
Message: displayErr,
}, nil
}

defer conn.Close()

if req.Peer.Type == protos.DBType_POSTGRES {
Expand All @@ -58,10 +57,8 @@ func (h *FlowRequestHandler) ValidatePeer(
}
}

validationConn, ok := conn.(connectors.ValidationConnector)
if ok {
validErr := validationConn.ValidateCheck(ctx)
if validErr != nil {
if validationConn, ok := conn.(connectors.ValidationConnector); ok {
if validErr := validationConn.ValidateCheck(ctx); validErr != nil {
displayErr := fmt.Sprintf("failed to validate peer %s: %v", req.Peer.Name, validErr)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name,
displayErr,
Expand All @@ -73,8 +70,7 @@ func (h *FlowRequestHandler) ValidatePeer(
}
}

connErr := conn.ConnectionActive(ctx)
if connErr != nil {
if connErr := conn.ConnectionActive(ctx); connErr != nil {
displayErr := fmt.Sprintf("failed to establish active connection to %s peer %s: %v", req.Peer.Type, req.Peer.Name, connErr)
h.alerter.LogNonFlowWarning(ctx, telemetry.CreatePeer, req.Peer.Name,
displayErr,
Expand Down
Loading
Loading