Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 16, 2024
2 parents 6918fc2 + 60fb14e commit 11fd8cf
Show file tree
Hide file tree
Showing 47 changed files with 1,226 additions and 661 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golang-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v6
with:
version: v1.60
version: v1.61
working-directory: ./flow
args: --timeout=10m
3 changes: 3 additions & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ services:
dockerfile: stacks/peerdb-ui.Dockerfile
ports:
- 3000:3000
env_file:
- path: ./.env
required: false
environment:
<<: *catalog-config
DATABASE_URL: postgres://postgres:postgres@catalog:5432/postgres
Expand Down
19 changes: 5 additions & 14 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,19 +361,6 @@ func (a *FlowableActivity) StartNormalize(
return nil, fmt.Errorf("failed to normalized records: %w", err)
}

// normalize flow did not run due to no records, no need to update end time.
if res.Done {
err = monitoring.UpdateEndTimeForCDCBatch(
ctx,
a.CatalogPool,
input.FlowConnectionConfigs.FlowJobName,
res.EndBatchID,
)
if err != nil {
return nil, err
}
}

// log the number of batches normalized
logger.Info(fmt.Sprintf("normalized records from batch %d to batch %d",
res.StartBatchID, res.EndBatchID))
Expand Down Expand Up @@ -702,7 +689,11 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
slotMetricGauges.OpenReplicationConnectionsGauge = openReplicationConnectionsGauge
}

if err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, slotName, peerName, slotMetricGauges); err != nil {
if err := srcConn.HandleSlotInfo(ctx, a.Alerter, a.CatalogPool, &alerting.AlertKeys{
FlowName: config.FlowJobName,
PeerName: peerName,
SlotName: slotName,
}, slotMetricGauges); err != nil {
logger.Error("Failed to handle slot info", slog.Any("error", err))
}
}()
Expand Down
69 changes: 48 additions & 21 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"log/slog"
"slices"
"strings"
"time"

Expand All @@ -25,44 +26,59 @@ type Alerter struct {
}

type AlertSenderConfig struct {
Sender AlertSender
Id int64
Sender AlertSender
AlertForMirrors []string
Id int64
}

type AlertKeys struct {
FlowName string
PeerName string
SlotName string
}

func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderConfig, error) {
rows, err := a.catalogPool.Query(ctx,
"SELECT id,service_type,service_config,enc_key_id FROM peerdb_stats.alerting_config")
`SELECT
id,
service_type,
service_config,
enc_key_id,
alert_for_mirrors
FROM peerdb_stats.alerting_config`)
if err != nil {
return nil, fmt.Errorf("failed to read alerter config from catalog: %w", err)
}

keys := peerdbenv.PeerDBEncKeys()
return pgx.CollectRows(rows, func(row pgx.CollectableRow) (AlertSenderConfig, error) {
var id int64
var alertSenderConfig AlertSenderConfig
var serviceType ServiceType
var serviceConfigEnc []byte
var encKeyId string
if err := row.Scan(&id, &serviceType, &serviceConfigEnc, &encKeyId); err != nil {
return AlertSenderConfig{}, err
if err := row.Scan(&alertSenderConfig.Id, &serviceType, &serviceConfigEnc, &encKeyId,
&alertSenderConfig.AlertForMirrors); err != nil {
return alertSenderConfig, err
}

key, err := keys.Get(encKeyId)
if err != nil {
return AlertSenderConfig{}, err
return alertSenderConfig, err
}
serviceConfig, err := key.Decrypt(serviceConfigEnc)
if err != nil {
return AlertSenderConfig{}, err
return alertSenderConfig, err
}

switch serviceType {
case SLACK:
var slackServiceConfig slackAlertConfig
if err := json.Unmarshal(serviceConfig, &slackServiceConfig); err != nil {
return AlertSenderConfig{}, fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
return alertSenderConfig, fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
}

return AlertSenderConfig{Id: id, Sender: newSlackAlertSender(&slackServiceConfig)}, nil
alertSenderConfig.Sender = newSlackAlertSender(&slackServiceConfig)
return alertSenderConfig, nil
case EMAIL:
var replyToAddresses []string
if replyToEnvString := strings.TrimSpace(
Expand All @@ -75,10 +91,10 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderCon
replyToAddresses: replyToAddresses,
}
if emailServiceConfig.sourceEmail == "" {
return AlertSenderConfig{}, errors.New("missing sourceEmail for Email alerting service")
return alertSenderConfig, errors.New("missing sourceEmail for Email alerting service")
}
if err := json.Unmarshal(serviceConfig, &emailServiceConfig); err != nil {
return AlertSenderConfig{}, fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
return alertSenderConfig, fmt.Errorf("failed to unmarshal %s service config: %w", serviceType, err)
}
var region *string
if envRegion := peerdbenv.PeerDBAlertingEmailSenderRegion(); envRegion != "" {
Expand All @@ -89,9 +105,11 @@ func (a *Alerter) registerSendersFromPool(ctx context.Context) ([]AlertSenderCon
if alertSenderErr != nil {
return AlertSenderConfig{}, fmt.Errorf("failed to initialize email alerter: %w", alertSenderErr)
}
return AlertSenderConfig{Id: id, Sender: alertSender}, nil
alertSenderConfig.Sender = alertSender

return alertSenderConfig, nil
default:
return AlertSenderConfig{}, fmt.Errorf("unknown service type: %s", serviceType)
return alertSenderConfig, fmt.Errorf("unknown service type: %s", serviceType)
}
})
}
Expand Down Expand Up @@ -119,7 +137,7 @@ func NewAlerter(ctx context.Context, catalogPool *pgxpool.Pool) *Alerter {
}
}

func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo *protos.SlotInfo) {
func (a *Alerter) AlertIfSlotLag(ctx context.Context, alertKeys *AlertKeys, slotInfo *protos.SlotInfo) {
alertSenderConfigs, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set alert senders", slog.Any("error", err))
Expand All @@ -144,12 +162,16 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo
}
}

alertKey := fmt.Sprintf("%s Slot Lag Threshold Exceeded for Peer %s", deploymentUIDPrefix, peerName)
alertKey := fmt.Sprintf("%s Slot Lag Threshold Exceeded for Peer %s", deploymentUIDPrefix, alertKeys.PeerName)
alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+
`currently at %.2fMB!`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)
`currently at %.2fMB!`, deploymentUIDPrefix, slotInfo.SlotName, alertKeys.PeerName, slotInfo.LagInMb)

if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) {
for _, alertSenderConfig := range alertSenderConfigs {
if len(alertSenderConfig.AlertForMirrors) > 0 &&
!slices.Contains(alertSenderConfig.AlertForMirrors, alertKeys.FlowName) {
continue
}
if a.checkAndAddAlertToCatalog(ctx,
alertSenderConfig.Id, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
if alertSenderConfig.Sender.getSlotLagMBAlertThreshold() > 0 {
Expand All @@ -168,7 +190,7 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo
}
}

func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
func (a *Alerter) AlertIfOpenConnections(ctx context.Context, alertKeys *AlertKeys,
openConnections *protos.GetOpenConnectionsForUserResult,
) {
alertSenderConfigs, err := a.registerSendersFromPool(ctx)
Expand All @@ -191,17 +213,22 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold
for _, alertSender := range alertSenderConfigs {
if alertSender.Sender.getOpenConnectionsAlertThreshold() > 0 {
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold, alertSender.Sender.getOpenConnectionsAlertThreshold())
lowestOpenConnectionsThreshold = min(lowestOpenConnectionsThreshold,
alertSender.Sender.getOpenConnectionsAlertThreshold())
}
}

alertKey := fmt.Sprintf("%s Max Open Connections Threshold Exceeded for Peer %s", deploymentUIDPrefix, peerName)
alertKey := fmt.Sprintf("%s Max Open Connections Threshold Exceeded for Peer %s", deploymentUIDPrefix, alertKeys.PeerName)
alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+
` has exceeded threshold size of %%d connections, currently at %d connections!`,
deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)
deploymentUIDPrefix, openConnections.UserName, alertKeys.PeerName, openConnections.CurrentOpenConnections)

if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) {
for _, alertSenderConfig := range alertSenderConfigs {
if len(alertSenderConfig.AlertForMirrors) > 0 &&
!slices.Contains(alertSenderConfig.AlertForMirrors, alertKeys.FlowName) {
continue
}
if a.checkAndAddAlertToCatalog(ctx,
alertSenderConfig.Id, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {
if alertSenderConfig.Sender.getOpenConnectionsAlertThreshold() > 0 {
Expand Down
32 changes: 22 additions & 10 deletions flow/cmd/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func (h *FlowRequestHandler) GetAlertConfigs(ctx context.Context, req *protos.GetAlertConfigsRequest) (*protos.GetAlertConfigsResponse, error) {
rows, err := h.pool.Query(ctx, "select id, service_type, service_config, enc_key_id from peerdb_stats.alerting_config")
rows, err := h.pool.Query(ctx, "SELECT id,service_type,service_config,enc_key_id,alert_for_mirrors from peerdb_stats.alerting_config")
if err != nil {
return nil, err
}
Expand All @@ -20,7 +20,7 @@ func (h *FlowRequestHandler) GetAlertConfigs(ctx context.Context, req *protos.Ge
var serviceConfigPayload []byte
var encKeyID string
config := &protos.AlertConfig{}
if err := row.Scan(&config.Id, &config.ServiceType, &serviceConfigPayload, &encKeyID); err != nil {
if err := row.Scan(&config.Id, &config.ServiceType, &serviceConfigPayload, &encKeyID, &config.AlertForMirrors); err != nil {
return nil, err
}
serviceConfig, err := peerdbenv.Decrypt(encKeyID, serviceConfigPayload)
Expand All @@ -42,34 +42,46 @@ func (h *FlowRequestHandler) PostAlertConfig(ctx context.Context, req *protos.Po
if err != nil {
return nil, err
}
serviceConfig, err := key.Encrypt(shared.UnsafeFastStringToReadOnlyBytes(req.ServiceConfig))
serviceConfig, err := key.Encrypt(shared.UnsafeFastStringToReadOnlyBytes(req.Config.ServiceConfig))
if err != nil {
return nil, err
}

if req.Id == -1 {
if req.Config.Id == -1 {
var id int32
if err := h.pool.QueryRow(
ctx,
"insert into peerdb_stats.alerting_config (service_type, service_config, enc_key_id) values ($1, $2, $3) returning id",
req.ServiceType,
`INSERT INTO peerdb_stats.alerting_config (
service_type,
service_config,
enc_key_id,
alert_for_mirrors
) VALUES (
$1,
$2,
$3,
$4
) RETURNING id`,
req.Config.ServiceType,
serviceConfig,
key.ID,
req.Config.AlertForMirrors,
).Scan(&id); err != nil {
return nil, err
}
return &protos.PostAlertConfigResponse{Id: id}, nil
} else if _, err := h.pool.Exec(
ctx,
"update peerdb_stats.alerting_config set service_type = $1, service_config = $2, enc_key_id = $3 where id = $4",
req.ServiceType,
"update peerdb_stats.alerting_config set service_type = $1, service_config = $2, enc_key_id = $3, alert_for_mirrors = $4 where id = $5",
req.Config.ServiceType,
serviceConfig,
key.ID,
req.Id,
req.Config.AlertForMirrors,
req.Config.Id,
); err != nil {
return nil, err
}
return &protos.PostAlertConfigResponse{Id: req.Id}, nil
return &protos.PostAlertConfigResponse{Id: req.Config.Id}, nil
}

func (h *FlowRequestHandler) DeleteAlertConfig(
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,12 @@ func (s *QRepAvroSyncMethod) writeToStage(
obj := bucket.Object(avroFilePath)
w := obj.NewWriter(ctx)

numRecords, err := ocfWriter.WriteOCF(ctx, w)
numRecords, err := ocfWriter.WriteOCF(ctx, w, -1)
if err != nil {
return 0, fmt.Errorf("failed to write records to Avro file on GCS: %w", err)
}
avroFile = &avro.AvroFile{
NumRecords: numRecords,
NumRecords: int(numRecords),
StorageLocation: avro.AvroGCSStorage,
FilePath: avroFilePath,
}
Expand Down
6 changes: 3 additions & 3 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ func (c *ClickhouseConnector) CreateRawTable(ctx context.Context, req *protos.Cr
}, nil
}

func (c *ClickhouseConnector) avroSyncMethod(flowJobName string) *ClickhouseAvroSyncMethod {
func (c *ClickhouseConnector) avroSyncMethod(flowJobName string, env map[string]string) *ClickhouseAvroSyncMethod {
qrepConfig := &protos.QRepConfig{
StagingPath: c.credsProvider.BucketPath,
FlowJobName: flowJobName,
DestinationTableIdentifier: c.getRawTableName(flowJobName),
}
return NewClickhouseAvroSyncMethod(qrepConfig, c)
return NewClickhouseAvroSyncMethod(qrepConfig, c, env)
}

func (c *ClickhouseConnector) syncRecordsViaAvro(
Expand All @@ -87,7 +87,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err)
}

avroSyncer := c.avroSyncMethod(req.FlowJobName)
avroSyncer := c.avroSyncMethod(req.FlowJobName, req.Env)
numRecords, err := avroSyncer.SyncRecords(ctx, stream, req.FlowJobName, syncBatchID)
if err != nil {
return nil, err
Expand Down
19 changes: 10 additions & 9 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

utils "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -237,7 +238,7 @@ func (c *ClickhouseConnector) NormalizeRecords(
}, nil
}

err = c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID)
err = c.copyAvroStagesToDestination(ctx, req.FlowJobName, normBatchID, req.SyncBatchID, req.Env)
if err != nil {
return nil, fmt.Errorf("failed to copy avro stages to destination: %w", err)
}
Expand Down Expand Up @@ -344,8 +345,6 @@ func (c *ClickhouseConnector) NormalizeRecords(
selectQuery.WriteString(tbl)
selectQuery.WriteString("'")

selectQuery.WriteString(" ORDER BY _peerdb_timestamp")

insertIntoSelectQuery := strings.Builder{}
insertIntoSelectQuery.WriteString("INSERT INTO ")
insertIntoSelectQuery.WriteString(tbl)
Expand Down Expand Up @@ -411,26 +410,28 @@ func (c *ClickhouseConnector) getDistinctTableNamesInBatch(
return tableNames, nil
}

func (c *ClickhouseConnector) copyAvroStageToDestination(ctx context.Context, flowJobName string, syncBatchID int64) error {
avroSynvMethod := c.avroSyncMethod(flowJobName)
func (c *ClickhouseConnector) copyAvroStageToDestination(ctx context.Context,
flowJobName string, syncBatchID int64, env map[string]string,
) error {
avroSyncMethod := c.avroSyncMethod(flowJobName, env)
avroFile, err := c.s3Stage.GetAvroStage(ctx, flowJobName, syncBatchID)
if err != nil {
return fmt.Errorf("failed to get avro stage: %w", err)
}
defer avroFile.Cleanup()

err = avroSynvMethod.CopyStageToDestination(ctx, avroFile)
err = avroSyncMethod.CopyStageToDestination(ctx, []*utils.AvroFile{avroFile})
if err != nil {
return fmt.Errorf("failed to copy stage to destination: %w", err)
}
return nil
}

func (c *ClickhouseConnector) copyAvroStagesToDestination(
ctx context.Context, flowJobName string, normBatchID, syncBatchID int64,
func (c *ClickhouseConnector) copyAvroStagesToDestination(ctx context.Context,
flowJobName string, normBatchID, syncBatchID int64, env map[string]string,
) error {
for s := normBatchID + 1; s <= syncBatchID; s++ {
err := c.copyAvroStageToDestination(ctx, flowJobName, s)
err := c.copyAvroStageToDestination(ctx, flowJobName, s, env)
if err != nil {
return fmt.Errorf("failed to copy avro stage to destination: %w", err)
}
Expand Down
Loading

0 comments on commit 11fd8cf

Please sign in to comment.