Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jul 12, 2024
2 parents f032db5 + 69d2eaf commit bf3ef34
Show file tree
Hide file tree
Showing 152 changed files with 3,292 additions and 3,238 deletions.
6 changes: 4 additions & 2 deletions .github/actions/genprotos/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ runs:
- uses: actions/checkout@v4
- name: check cache
id: cache
uses: actions/cache@v4
uses: ubicloud/cache@v4
with:
path: |
./flow/generated/protos
Expand All @@ -17,10 +17,12 @@ runs:
- if: steps.cache.outputs.cache-hit != 'true'
uses: actions/setup-go@v5
with:
go-version: '1.22.3'
go-version: '1.22.5'
cache: false
- if: steps.cache.outputs.cache-hit != 'true'
uses: bufbuild/[email protected]
with:
github_token: ${{ github.token }}
- if: steps.cache.outputs.cache-hit != 'true'
uses: dtolnay/rust-toolchain@stable
- if: steps.cache.outputs.cache-hit != 'true'
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ jobs:
with:
version: "latest"

- name: start clickhouse
uses: getsentry/action-clickhouse-in-ci@v1

- name: Install Temporal CLI
uses: temporalio/setup-temporal@v0

Expand Down
27 changes: 8 additions & 19 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ func (a *FlowableActivity) StartNormalize(
res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SyncBatchID: input.SyncBatchID,
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
TableNameSchemaMapping: input.TableNameSchemaMapping,
Expand Down Expand Up @@ -544,36 +543,26 @@ func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.Q
return dst.CleanupQRepFlow(ctx, config)
}

func (a *FlowableActivity) DropFlowSource(ctx context.Context, config *protos.ShutdownRequest) error {
sourcePeerName, err := a.getPeerNameForMirror(ctx, config.FlowJobName, Source)
if err != nil {
return err
}

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, a.CatalogPool, sourcePeerName)
func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropFlowActivityInput) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, a.CatalogPool, req.PeerName)
if err != nil {
return fmt.Errorf("failed to get source connector: %w", err)
}
defer connectors.CloseConnector(ctx, srcConn)

return srcConn.PullFlowCleanup(ctx, config.FlowJobName)
return srcConn.PullFlowCleanup(ctx, req.FlowJobName)
}

func (a *FlowableActivity) DropFlowDestination(ctx context.Context, config *protos.ShutdownRequest) error {
destinationPeerName, err := a.getPeerNameForMirror(ctx, config.FlowJobName, Destination)
if err != nil {
return err
}

ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, a.CatalogPool, destinationPeerName)
func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.DropFlowActivityInput) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
dstConn, err := connectors.GetByNameAs[connectors.CDCSyncConnector](ctx, a.CatalogPool, req.PeerName)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

return dstConn.SyncFlowCleanup(ctx, config.FlowJobName)
return dstConn.SyncFlowCleanup(ctx, req.FlowJobName)
}

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
Expand Down
20 changes: 2 additions & 18 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
optionRows, err := a.CatalogPool.Query(ctx, `
SELECT p.name, p.options, p.enc_key_id
FROM peers p
WHERE p.type = $1 AND EXISTS(SELECT * FROM flows f ON p.id = f.source_peer)`, protos.DBType_POSTGRES)
WHERE p.type = $1 AND EXISTS(SELECT * FROM flows f WHERE p.id = f.source_peer)`, protos.DBType_POSTGRES)
if err != nil {
return nil, err
}
Expand All @@ -306,7 +306,7 @@ func (a *FlowableActivity) getPostgresPeerConfigs(ctx context.Context) ([]*proto
return nil, err
}

peerOptions, err := connectors.DecryptPeerOptions(encKeyID, encPeerOptions)
peerOptions, err := peerdbenv.Decrypt(encKeyID, encPeerOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -540,19 +540,3 @@ func replicateXminPartition[TRead any, TWrite any, TSync connectors.QRepSyncConn

return currentSnapshotXmin, nil
}

func (a *FlowableActivity) getPeerNameForMirror(ctx context.Context, flowName string, peerType PeerType) (string, error) {
peerClause := "source_peer"
if peerType == Destination {
peerClause = "destination_peer"
}
q := fmt.Sprintf("SELECT p.name FROM flows f JOIN peers p ON f.%s = p.id WHERE f.name = $1;", peerClause)
var peerName string
err := a.CatalogPool.QueryRow(ctx, q, flowName).Scan(&peerName)
if err != nil {
slog.Error("failed to get peer name for flow", slog.String("flow_name", flowName), slog.Any("error", err))
return "", fmt.Errorf("failed to get peer name for flow %s: %w", flowName, err)
}

return peerName, nil
}
49 changes: 29 additions & 20 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,38 @@ type AlertSenderConfig struct {

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderConfig, error) {
rows, err := a.catalogPool.Query(ctx,
"SELECT id,service_type,service_config FROM peerdb_stats.alerting_config")
"SELECT id,service_type,service_config,enc_key_id FROM peerdb_stats.alerting_config")
if err != nil {
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
}

var alertSenderConfigs []AlertSenderConfig
var serviceType ServiceType
var serviceConfig string
var id int64
_, err = pgx.ForEachRow(rows, []any{&id, &serviceType, &serviceConfig}, func() error {
keys := peerdbenv.PeerDBEncKeys()
return pgx.CollectRows(rows, func(row pgx.CollectableRow) (AlertSenderConfig, error) {
var id int64
var serviceType ServiceType
var serviceConfigEnc []byte
var encKeyId string
if err := row.Scan(&id, &serviceType, &serviceConfigEnc, &encKeyId); err != nil {
return AlertSenderConfig{}, err
}

key, err := keys.Get(encKeyId)
if err != nil {
return AlertSenderConfig{}, err
}
serviceConfig, err := key.Decrypt(serviceConfigEnc)
if err != nil {
return AlertSenderConfig{}, err
}

switch serviceType {
case SLACK:
var slackServiceConfig slackAlertConfig
err = json.Unmarshal([]byte(serviceConfig), &slackServiceConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
if err := json.Unmarshal(serviceConfig, &slackServiceConfig); err != nil {
return AlertSenderConfig{}, fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
}

alertSenderConfigs = append(alertSenderConfigs, AlertSenderConfig{Id: id, Sender: newSlackAlertSender(&slackServiceConfig)})
return AlertSenderConfig{Id: id, Sender: newSlackAlertSender(&slackServiceConfig)}, nil
case EMAIL:
var replyToAddresses []string
if replyToEnvString := strings.TrimSpace(
Expand All @@ -62,11 +75,10 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderCon
replyToAddresses: replyToAddresses,
}
if emailServiceConfig.sourceEmail == "" {
return errors.New("missing sourceEmail for Email alerting service")
return AlertSenderConfig{}, errors.New("missing sourceEmail for Email alerting service")
}
err = json.Unmarshal([]byte(serviceConfig), &emailServiceConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
if err := json.Unmarshal(serviceConfig, &emailServiceConfig); err != nil {
return AlertSenderConfig{}, fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
}
var region *string
if envRegion := peerdbenv.PeerDBAlertingEmailSenderRegion(); envRegion != "" {
Expand All @@ -75,16 +87,13 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderCon

alertSender, alertSenderErr := NewEmailAlertSenderWithNewClient(ctx, region, &emailServiceConfig)
if alertSenderErr != nil {
return fmt.Errorf("failed to initialize email alerter: %w", alertSenderErr)
return AlertSenderConfig{}, fmt.Errorf("failed to initialize email alerter: %w", alertSenderErr)
}
alertSenderConfigs = append(alertSenderConfigs, AlertSenderConfig{Id: id, Sender: alertSender})
return AlertSenderConfig{Id: id, Sender: alertSender}, nil
default:
return fmt.Errorf("unknown service type: %s", serviceType)
return AlertSenderConfig{}, fmt.Errorf("unknown service type: %s", serviceType)
}
return nil
})

return alertSenderConfigs, nil
}

// doesn't take care of closing pool, needs to be done externally.
Expand Down
83 changes: 83 additions & 0 deletions flow/cmd/alerts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package cmd

import (
"context"

"github.com/jackc/pgx/v5"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)

func (h *FlowRequestHandler) GetAlertConfigs(ctx context.Context, req *protos.GetAlertConfigsRequest) (*protos.GetAlertConfigsResponse, error) {
rows, err := h.pool.Query(ctx, "select id, service_type, service_config, enc_key_id from peerdb_stats.alerting_config")
if err != nil {
return nil, err
}

configs, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.AlertConfig, error) {
var serviceConfigPayload []byte
var encKeyID string
config := &protos.AlertConfig{}
if err := row.Scan(&config.Id, &config.ServiceType, &serviceConfigPayload, &encKeyID); err != nil {
return nil, err
}
serviceConfig, err := peerdbenv.Decrypt(encKeyID, serviceConfigPayload)
if err != nil {
return nil, err
}
config.ServiceConfig = string(serviceConfig)
return config, nil
})
if err != nil {
return nil, err
}

return &protos.GetAlertConfigsResponse{Configs: configs}, nil
}

func (h *FlowRequestHandler) PostAlertConfig(ctx context.Context, req *protos.PostAlertConfigRequest) (*protos.PostAlertConfigResponse, error) {
key, err := peerdbenv.PeerDBCurrentEncKey()
if err != nil {
return nil, err
}
serviceConfig, err := key.Encrypt(shared.UnsafeFastStringToReadOnlyBytes(req.ServiceConfig))
if err != nil {
return nil, err
}

if req.Id == -1 {
var id int32
if err := h.pool.QueryRow(
ctx,
"insert into peerdb_stats.alerting_config (service_type, service_config, enc_key_id) values ($1, $2, $3) returning id",
req.ServiceType,
serviceConfig,
key.ID,
).Scan(&id); err != nil {
return nil, err
}
return &protos.PostAlertConfigResponse{Id: id}, nil
} else if _, err := h.pool.Exec(
ctx,
"update peerdb_stats.alerting_config set service_type = $1, service_config = $2, enc_key_id = $3 where id = $4",
req.ServiceType,
serviceConfig,
key.ID,
req.Id,
); err != nil {
return nil, err
}
return &protos.PostAlertConfigResponse{Id: req.Id}, nil
}

func (h *FlowRequestHandler) DeleteAlertConfig(
ctx context.Context,
req *protos.DeleteAlertConfigRequest,
) (*protos.DeleteAlertConfigResponse, error) {
if _, err := h.pool.Exec(ctx, "delete from peerdb_stats.alerting_config where id = $1", req.Id); err != nil {
return nil, err
}
return &protos.DeleteAlertConfigResponse{}, nil
}
Loading

0 comments on commit bf3ef34

Please sign in to comment.