Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jun 28, 2024
2 parents b7b4ccf + f864924 commit f032db5
Show file tree
Hide file tree
Showing 56 changed files with 2,179 additions and 1,506 deletions.
2 changes: 1 addition & 1 deletion flow/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ linters:
- misspell
- musttag
- nakedret
# TODO bring back in 0.15 - nolintlint
- nolintlint
- nonamedreturns
- perfsprint
- prealloc
Expand Down
51 changes: 13 additions & 38 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,13 @@ func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.Q
}

func (a *FlowableActivity) DropFlowSource(ctx context.Context, config *protos.ShutdownRequest) error {
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, a.CatalogPool, config.SourcePeer)
sourcePeerName, err := a.getPeerNameForMirror(ctx, config.FlowJobName, Source)
if err != nil {
return err
}

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, a.CatalogPool, sourcePeerName)
if err != nil {
return fmt.Errorf("failed to get source connector: %w", err)
}
Expand All @@ -555,8 +561,13 @@ func (a *FlowableActivity) DropFlowSource(ctx context.Context, config *protos.Sh
}

func (a *FlowableActivity) DropFlowDestination(ctx context.Context, config *protos.ShutdownRequest) error {
destinationPeerName, err := a.getPeerNameForMirror(ctx, config.FlowJobName, Destination)
if err != nil {
return err
}

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, a.CatalogPool, config.DestinationPeer)
dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, a.CatalogPool, destinationPeerName)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
}
Expand Down Expand Up @@ -813,39 +824,3 @@ func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *prot
}
return err
}

// TODO remove in 0.15
func (a *FlowableActivity) UpdateCdcFlowConfigInCatalog(
ctx context.Context,
cfg *protos.FlowConnectionConfigs,
) error {
cfgBytes, err := proto.Marshal(cfg)
if err != nil {
return fmt.Errorf("unable to marshal flow config: %w", err)
}

_, err = a.CatalogPool.Exec(ctx, "UPDATE flows SET config_proto = $1 WHERE name = $2", cfgBytes, cfg.FlowJobName)
if err != nil {
return fmt.Errorf("unable to update flow config in catalog: %w", err)
}

return nil
}

// TODO remove in 0.15
func (a *FlowableActivity) UpdateQRepFlowConfigInCatalog(
ctx context.Context,
cfg *protos.FlowConnectionConfigs,
) error {
cfgBytes, err := proto.Marshal(cfg)
if err != nil {
return fmt.Errorf("unable to marshal flow config: %w", err)
}

_, err = a.CatalogPool.Exec(ctx, "UPDATE flows SET config_proto = $1 WHERE name = $2", cfgBytes, cfg.FlowJobName)
if err != nil {
return fmt.Errorf("unable to update flow config in catalog: %w", err)
}

return nil
}
40 changes: 34 additions & 6 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

type PeerType string

const (
Source PeerType = "source"
Destination PeerType = "destination"
)

func heartbeatRoutine(
ctx context.Context,
message func() string,
Expand Down Expand Up @@ -284,21 +291,26 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon

func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*protos.Peer, error) {
optionRows, err := a.CatalogPool.Query(ctx, `
SELECT DISTINCT p.name, p.options
FROM peers p
JOIN flows f ON p.id = f.source_peer
WHERE p.type = $1`, protos.DBType_POSTGRES)
SELECT p.name, p.options, p.enc_key_id
FROM peers p
WHERE p.type = $1 AND EXISTS(SELECT * FROM flows f ON p.id = f.source_peer)`, protos.DBType_POSTGRES)
if err != nil {
return nil, err
}

return pgx.CollectRows(optionRows, func(row pgx.CollectableRow) (*protos.Peer, error) {
var peerName string
var peerOptions []byte
err := optionRows.Scan(&peerName, &peerOptions)
var encPeerOptions []byte
var encKeyID string
if err := optionRows.Scan(&peerName, &encPeerOptions, &encKeyID); err != nil {
return nil, err
}

peerOptions, err := connectors.DecryptPeerOptions(encKeyID, encPeerOptions)
if err != nil {
return nil, err
}

var pgPeerConfig protos.PostgresConfig
unmarshalErr := proto.Unmarshal(peerOptions, &pgPeerConfig)
if unmarshalErr != nil {
Expand Down Expand Up @@ -528,3 +540,19 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn

return currentSnapshotXmin, nil
}

func (a *FlowableActivity) getPeerNameForMirror(ctx context.Context, flowName string, peerType PeerType) (string, error) {
peerClause := "source_peer"
if peerType == Destination {
peerClause = "destination_peer"
}
q := fmt.Sprintf("SELECT p.name FROM flows f JOIN peers p ON f.%s = p.id WHERE f.name = $1;", peerClause)
var peerName string
err := a.CatalogPool.QueryRow(ctx, q, flowName).Scan(&peerName)
if err != nil {
slog.Error("failed to get peer name for flow", slog.String("flow_name", flowName), slog.Any("error", err))
return "", fmt.Errorf("failed to get peer name for flow %s: %w", flowName, err)
}

return peerName, nil
}
99 changes: 95 additions & 4 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/google/uuid"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"google.golang.org/grpc"
Expand All @@ -37,6 +39,93 @@ type APIServerParams struct {
GatewayPort uint16
}

type RecryptItem struct {
options []byte
id int32
}

func recryptDatabase(ctx context.Context, catalogPool *pgxpool.Pool) {
newKeyID := peerdbenv.PeerDBCurrentEncKeyID()
keys := peerdbenv.PeerDBEncKeys()
if newKeyID == "" {
if len(keys) == 0 {
slog.Warn("Encryption disabled. This is not recommended.")
} else {
slog.Warn("Encryption disabled, decrypting any currently encrypted configs. This is not recommended.")
}
}

key, err := keys.Get(newKeyID)
if err != nil {
slog.Warn("recrypt failed to find key, skipping", slog.Any("error", err))
return
}

tx, err := catalogPool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
slog.Warn("recrypt failed to start transaction, skipping", slog.Any("error", err))
return
}
defer shared.RollbackTx(tx, slog.Default())

rows, err := tx.Query(ctx, "SELECT id, options, enc_key_id FROM peers WHERE enc_key_id <> $1 FOR UPDATE", newKeyID)
if err != nil {
slog.Warn("recrypt failed to query, skipping", slog.Any("error", err))
return
}
var todo []RecryptItem
var id int32
var options []byte
var oldKeyID string
for rows.Next() {
if err := rows.Scan(&id, &options, &oldKeyID); err != nil {
slog.Warn("recrypt failed to scan, skipping", slog.Any("error", err))
continue
}

oldKey, err := keys.Get(oldKeyID)
if err != nil {
slog.Warn("recrypt failed to find key, skipping", slog.Any("error", err), slog.String("enc_key_id", oldKeyID))
continue
}

if oldKey != nil {
options, err = oldKey.Decrypt(options)
if err != nil {
slog.Warn("recrypt failed to decrypt, skipping", slog.Any("error", err), slog.Int64("id", int64(id)))
continue
}
}

if key != nil {
options, err = key.Encrypt(options)
if err != nil {
slog.Warn("recrypt failed to encrypt, skipping", slog.Any("error", err))
continue
}
}

slog.Info("recrypting peer", slog.Int64("id", int64(id)), slog.String("oldKey", oldKeyID), slog.String("newKey", newKeyID))
todo = append(todo, RecryptItem{id: id, options: options})
}
if err := rows.Err(); err != nil {
slog.Warn("recrypt iteration failed, skipping", slog.Any("error", err))
return
}

for _, item := range todo {
if _, err := tx.Exec(ctx, "UPDATE peers SET options = $2, enc_key_id = $3 WHERE id = $1", item.id, item.options, newKeyID); err != nil {
slog.Warn("recrypt failed to update, ignoring", slog.Any("error", err), slog.Int64("id", int64(item.id)))
return
}
}

if err := tx.Commit(ctx); err != nil {
slog.Warn("recrypt failed to commit transaction, skipping", slog.Any("error", err))
}
slog.Info("recrypt finished")
}

// setupGRPCGatewayServer sets up the grpc-gateway mux
func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
conn, err := grpc.NewClient(
Expand Down Expand Up @@ -117,13 +206,13 @@ func APIMain(ctx context.Context, args *APIServerParams) error {

grpcServer := grpc.NewServer()

catalogConn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx)
catalogPool, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx)
if err != nil {
return fmt.Errorf("unable to get catalog connection pool: %w", err)
}

taskQueue := peerdbenv.PeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
flowHandler := NewFlowRequestHandler(tc, catalogConn, taskQueue)
flowHandler := NewFlowRequestHandler(tc, catalogPool, taskQueue)

err = killExistingScheduleFlows(ctx, tc, args.TemporalNamespace, taskQueue)
if err != nil {
Expand Down Expand Up @@ -168,13 +257,15 @@ func APIMain(ctx context.Context, args *APIServerParams) error {

slog.Info(fmt.Sprintf("Starting API gateway on port %d", args.GatewayPort))
go func() {
if err := gateway.ListenAndServe(); err != nil {
if err := gateway.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("failed to serve http: %v", err)
}
}()

<-ctx.Done()
// somewhat unrelated here, but needed a process which isn't replicated
go recryptDatabase(ctx, catalogPool)

<-ctx.Done()
grpcServer.GracefulStop()
slog.Info("Server has been shut down gracefully. Exiting...")

Expand Down
Loading

0 comments on commit f032db5

Please sign in to comment.