Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jun 7, 2024
2 parents 0a17d0f + 12687b4 commit fb6cf99
Show file tree
Hide file tree
Showing 62 changed files with 1,380 additions and 629 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ jobs:
AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }}
AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
AZURE_SUBSCRIPTION_ID: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
ENABLE_SQLSERVER_TESTS: true
ENABLE_SQLSERVER_TESTS: "true"
SQLSERVER_HOST: ${{ secrets.SQLSERVER_HOST }}
SQLSERVER_PORT: ${{ secrets.SQLSERVER_PORT }}
SQLSERVER_USER: ${{ secrets.SQLSERVER_USER }}
Expand All @@ -138,4 +138,5 @@ jobs:
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_QUEUE_FORCE_TOPIC_CREATION: "true"
ELASTICSEARCH_TEST_ADDRESS: http://localhost:9200
2 changes: 0 additions & 2 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3.9"

name: peerdb-quickstart-dev

x-minio-config: &minio-config
Expand Down
4 changes: 1 addition & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3.9"

name: peerdb-quickstart

x-minio-config: &minio-config
Expand Down Expand Up @@ -175,7 +173,7 @@ services:
- flow-api

minio:
image: minio/minio
image: minio/minio:latest
restart: unless-stopped
volumes:
- minio-data:/data
Expand Down
18 changes: 13 additions & 5 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,19 +570,27 @@ func (a *FlowableActivity) DropFlowDestination(ctx context.Context, config *prot

func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
logger := activity.GetLogger(ctx)
if !peerdbenv.PeerDBEnableWALHeartbeat() {
walHeartbeatEnabled, err := peerdbenv.PeerDBEnableWALHeartbeat(ctx)
if err != nil {
logger.Warn("unable to fetch wal heartbeat config. Skipping walheartbeat send.", slog.Any("error", err))
return err
}
if !walHeartbeatEnabled {
logger.Info("wal heartbeat is disabled")
return nil
}
walHeartbeatStatement, err := peerdbenv.PeerDBWALHeartbeatQuery(ctx)
if err != nil {
logger.Warn("unable to fetch wal heartbeat config. Skipping walheartbeat send.", slog.Any("error", err))
return err
}

pgPeers, err := a.getPostgresPeerConfigs(ctx)
if err != nil {
logger.Warn("[sendwalheartbeat] unable to fetch peers. " +
"Skipping walheartbeat send. Error: " + err.Error())
logger.Warn("[sendwalheartbeat] unable to fetch peers. Skipping walheartbeat send.", slog.Any("error", err))
return err
}

command := peerdbenv.PeerDBWALHeartbeatQuery()
// run above command for each Postgres peer
for _, pgPeer := range pgPeers {
activity.RecordHeartbeat(ctx, pgPeer.Name)
Expand All @@ -599,7 +607,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error {
return
}
defer pgConn.Close()
cmdErr := pgConn.ExecuteCommand(ctx, command)
cmdErr := pgConn.ExecuteCommand(ctx, walHeartbeatStatement)
if cmdErr != nil {
logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, cmdErr))
}
Expand Down
6 changes: 5 additions & 1 deletion flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
consumedOffset := atomic.Int64{}
consumedOffset.Store(lastOffset)

recordBatchPull := model.NewCDCStream[Items](peerdbenv.PeerDBCDCChannelBufferSize())
channelBufferSize, err := peerdbenv.PeerDBCDCChannelBufferSize(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get CDC channel buffer size: %w", err)
}
recordBatchPull := model.NewCDCStream[Items](int(channelBufferSize))
recordBatchSync := recordBatchPull
if adaptStream != nil {
var err error
Expand Down
21 changes: 16 additions & 5 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"

"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/peerdbenv"
Expand Down Expand Up @@ -123,7 +122,11 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo
deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID())
}

defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx)
defaultSlotLagMBAlertThreshold, err := peerdbenv.PeerDBSlotLagMBAlertThreshold(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to get slot lag alert threshold from catalog", slog.Any("error", err))
return
}
// catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead
lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold
for _, alertSender := range alertSenderConfigs {
Expand Down Expand Up @@ -171,7 +174,11 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,
}

// same as with slot lag, use lowest threshold for catalog
defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx)
defaultOpenConnectionsThreshold, err := peerdbenv.PeerDBOpenConnectionsAlertThreshold(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to get open connections alert threshold from catalog", slog.Any("error", err))
return
}
lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold
for _, alertSender := range alertSenderConfigs {
if alertSender.Sender.getOpenConnectionsAlertThreshold() > 0 {
Expand Down Expand Up @@ -216,7 +223,11 @@ func (a *Alerter) alertToProvider(ctx context.Context, alertSenderConfig AlertSe
// in the past X minutes, where X is configurable and defaults to 15 minutes
// returns true if alert added to catalog, so proceed with processing alerts to slack
func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId int64, alertKey string, alertMessage string) bool {
dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx)
dur, err := peerdbenv.PeerDBAlertingGapMinutesAsDuration(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to get alerting gap duration from catalog", slog.Any("error", err))
return false
}
if dur == 0 {
logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning")
return false
Expand All @@ -227,7 +238,7 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId i
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey, alertConfigId)
var createdTimestamp time.Time
err := row.Scan(&createdTimestamp)
err = row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error()))
return false
Expand Down
31 changes: 24 additions & 7 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
numeric "github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
Expand Down Expand Up @@ -681,7 +680,10 @@ func (c *BigQueryConnector) SetupNormalizedTable(
}
}

timePartitionEnabled := dynamicconf.PeerDBBigQueryEnableSyncedAtPartitioning(ctx)
timePartitionEnabled, err := peerdbenv.PeerDBBigQueryEnableSyncedAtPartitioning(ctx)
if err != nil {
return false, fmt.Errorf("failed to get dynamic setting for BigQuery time partitioning: %w", err)
}
var timePartitioning *bigquery.TimePartitioning
if timePartitionEnabled && syncedAtColName != "" {
timePartitioning = &bigquery.TimePartitioning{
Expand Down Expand Up @@ -747,9 +749,15 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
continue
}

// For a table with replica identity full and a JSON column
// the equals to comparison we do down below will fail
// so we need to use TO_JSON_STRING for those columns
columnIsJSON := make(map[string]bool, len(renameRequest.TableSchema.Columns))
columnNames := make([]string, 0, len(renameRequest.TableSchema.Columns))
for _, col := range renameRequest.TableSchema.Columns {
columnNames = append(columnNames, "`"+col.Name+"`")
quotedCol := "`" + col.Name + "`"
columnNames = append(columnNames, quotedCol)
columnIsJSON[quotedCol] = (col.Type == "json" || col.Type == "jsonb")
}

if req.SoftDeleteColName != nil {
Expand All @@ -775,10 +783,19 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
pkeyOnClauseBuilder := strings.Builder{}
ljWhereClauseBuilder := strings.Builder{}
for idx, col := range pkeyCols {
pkeyOnClauseBuilder.WriteString("_pt.")
pkeyOnClauseBuilder.WriteString(col)
pkeyOnClauseBuilder.WriteString(" = _resync.")
pkeyOnClauseBuilder.WriteString(col)
if columnIsJSON[col] {
// We need to use TO_JSON_STRING for comparing JSON columns
pkeyOnClauseBuilder.WriteString("TO_JSON_STRING(_pt.")
pkeyOnClauseBuilder.WriteString(col)
pkeyOnClauseBuilder.WriteString(")=TO_JSON_STRING(_resync.")
pkeyOnClauseBuilder.WriteString(col)
pkeyOnClauseBuilder.WriteString(")")
} else {
pkeyOnClauseBuilder.WriteString("_pt.")
pkeyOnClauseBuilder.WriteString(col)
pkeyOnClauseBuilder.WriteString("=_resync.")
pkeyOnClauseBuilder.WriteString(col)
}

ljWhereClauseBuilder.WriteString("_resync.")
ljWhereClauseBuilder.WriteString(col)
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ func NewClickhouseConnector(
bucketPathSuffix := fmt.Sprintf("%s/%s",
url.PathEscape(deploymentUID), url.PathEscape(flowName))
// Fallback: Get S3 credentials from environment
awsBucketName := peerdbenv.PeerDBClickhouseAWSS3BucketName()
awsBucketName, err := peerdbenv.PeerDBClickhouseAWSS3BucketName(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get PeerDB Clickhouse Bucket Name: %w", err)
}
if awsBucketName == "" {
return nil, errors.New("PeerDB Clickhouse Bucket Name not set")
}
Expand Down
9 changes: 6 additions & 3 deletions flow/connectors/connelasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,13 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
defer cacheCloser()

flushLoopDone := make(chan struct{})
// we only update lastSeenLSN in the OnSuccess call, so this should be safe even if race
// between loop breaking and closing flushLoopDone
go func() {
ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds())
flushTimeout, err := peerdbenv.PeerDBQueueFlushTimeoutSeconds(ctx)
if err != nil {
esc.logger.Warn("[elasticsearch] failed to get flush timeout, no periodic flushing", slog.Any("error", err))
return
}
ticker := time.NewTicker(flushTimeout)
defer ticker.Stop()

for {
Expand Down
6 changes: 5 additions & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ func (c *EventHubConnector) processBatch(
batchPerTopic := NewHubBatches(c.hubManager)
toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns, false)

ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds())
flushTimeout, err := peerdbenv.PeerDBQueueFlushTimeoutSeconds(ctx)
if err != nil {
return 0, fmt.Errorf("failed to get flush timeout: %w", err)
}
ticker := time.NewTicker(flushTimeout)
defer ticker.Stop()

lastSeenLSN := int64(0)
Expand Down
52 changes: 46 additions & 6 deletions flow/connectors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package connkafka
import (
"context"
"crypto/tls"
"errors"
"fmt"
"log/slog"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
Expand Down Expand Up @@ -40,7 +43,6 @@ func NewKafkaConnector(
kgo.SeedBrokers(config.Servers...),
kgo.AllowAutoTopicCreation(),
kgo.WithLogger(kslog.New(slog.Default())), // TODO use logger.LoggerFromCtx
kgo.SoftwareNameAndVersion("peerdb", peerdbenv.PeerDBVersionShaShort()),
)
if !config.DisableTls {
optionalOpts = append(optionalOpts, kgo.DialTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12}))
Expand Down Expand Up @@ -72,6 +74,11 @@ func NewKafkaConnector(
return nil, fmt.Errorf("unsupported SASL mechanism: %s", config.Sasl)
}
}
force, err := peerdbenv.PeerDBQueueForceTopicCreation(ctx)
if err == nil && force {
optionalOpts = append(optionalOpts, kgo.UnknownTopicRetries(0))
}

client, err := kgo.NewClient(optionalOpts...)
if err != nil {
return nil, fmt.Errorf("failed to create kafka client: %w", err)
Expand Down Expand Up @@ -176,7 +183,12 @@ func (c *KafkaConnector) createPool(
lastSeenLSN *atomic.Int64,
queueErr func(error),
) (*utils.LPool[poolResult], error) {
return utils.LuaPool(func() (*lua.LState, error) {
maxSize, err := peerdbenv.PeerDBQueueParallelism(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get parallelism: %w", err)
}

return utils.LuaPool(int(maxSize), func() (*lua.LState, error) {
ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) {
_ = c.LogFlowInfo(ctx, flowJobName, s)
}))
Expand All @@ -197,13 +209,36 @@ func (c *KafkaConnector) createPool(
recordCounter := atomic.Int32{}
recordCounter.Store(lenRecords)
for _, kr := range result.records {
c.client.Produce(ctx, kr, func(_ *kgo.Record, err error) {
var handler func(*kgo.Record, error)
handler = func(_ *kgo.Record, err error) {
if err != nil {
queueErr(err)
var success bool
if errors.Is(err, kerr.UnknownTopicOrPartition) {
force, envErr := peerdbenv.PeerDBQueueForceTopicCreation(ctx)
if envErr == nil && force {
c.logger.Info("[kafka] force topic creation", slog.String("topic", kr.Topic))
_, err := kadm.NewClient(c.client).CreateTopic(ctx, 1, 3, nil, kr.Topic)
if err != nil && !errors.Is(err, kerr.TopicAlreadyExists) {
c.logger.Warn("[kafka] topic create error", slog.Any("error", err))
queueErr(err)
return
}
success = true
}
} else {
c.logger.Warn("[kafka] produce error", slog.Any("error", err))
}
if success {
time.Sleep(time.Second) // topic creation can take time to propagate, throttle
c.client.Produce(ctx, kr, handler)
} else {
queueErr(err)
}
} else if recordCounter.Add(-1) == 0 && lastSeenLSN != nil {
shared.AtomicInt64Max(lastSeenLSN, result.lsn)
}
})
}
c.client.Produce(ctx, kr, handler)
}
}
})
Expand All @@ -223,7 +258,12 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
flushLoopDone := make(chan struct{})
go func() {
ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds())
flushTimeout, err := peerdbenv.PeerDBQueueFlushTimeoutSeconds(ctx)
if err != nil {
c.logger.Warn("[kafka] failed to get flush timeout, no periodic flushing", slog.Any("error", err))
return
}
ticker := time.NewTicker(flushTimeout)
defer ticker.Stop()

for {
Expand Down
5 changes: 4 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,10 @@ func PullCdcRecords[Items model.Items](
}

var standByLastLogged time.Time
cdcRecordsStorage := utils.NewCDCStore[Items](p.flowJobName)
cdcRecordsStorage, err := utils.NewCDCStore[Items](ctx, p.flowJobName)
if err != nil {
return err
}
defer func() {
if cdcRecordsStorage.IsEmpty() {
records.SignalAsEmpty()
Expand Down
Loading

0 comments on commit fb6cf99

Please sign in to comment.