diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index fb1774a769..266492fb56 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -127,7 +127,7 @@ jobs: AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }} AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }} AZURE_SUBSCRIPTION_ID: ${{ secrets.AZURE_SUBSCRIPTION_ID }} - ENABLE_SQLSERVER_TESTS: true + ENABLE_SQLSERVER_TESTS: "true" SQLSERVER_HOST: ${{ secrets.SQLSERVER_HOST }} SQLSERVER_PORT: ${{ secrets.SQLSERVER_PORT }} SQLSERVER_USER: ${{ secrets.SQLSERVER_USER }} @@ -138,4 +138,5 @@ jobs: PEERDB_CATALOG_USER: postgres PEERDB_CATALOG_PASSWORD: postgres PEERDB_CATALOG_DATABASE: postgres + PEERDB_QUEUE_FORCE_TOPIC_CREATION: "true" ELASTICSEARCH_TEST_ADDRESS: http://localhost:9200 diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 1868c755bf..c62fa71863 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -1,5 +1,3 @@ -version: "3.9" - name: peerdb-quickstart-dev x-minio-config: &minio-config diff --git a/docker-compose.yml b/docker-compose.yml index 1b1617233d..75b48cf53a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.9" - name: peerdb-quickstart x-minio-config: &minio-config @@ -175,7 +173,7 @@ services: - flow-api minio: - image: minio/minio + image: minio/minio:latest restart: unless-stopped volumes: - minio-data:/data diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 844c60c088..14517b09cd 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -570,19 +570,27 @@ func (a *FlowableActivity) DropFlowDestination(ctx context.Context, config *prot func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { logger := activity.GetLogger(ctx) - if !peerdbenv.PeerDBEnableWALHeartbeat() { + walHeartbeatEnabled, err := peerdbenv.PeerDBEnableWALHeartbeat(ctx) + if err != nil { + logger.Warn("unable to fetch wal heartbeat config. Skipping walheartbeat send.", slog.Any("error", err)) + return err + } + if !walHeartbeatEnabled { logger.Info("wal heartbeat is disabled") return nil } + walHeartbeatStatement, err := peerdbenv.PeerDBWALHeartbeatQuery(ctx) + if err != nil { + logger.Warn("unable to fetch wal heartbeat config. Skipping walheartbeat send.", slog.Any("error", err)) + return err + } pgPeers, err := a.getPostgresPeerConfigs(ctx) if err != nil { - logger.Warn("[sendwalheartbeat] unable to fetch peers. " + - "Skipping walheartbeat send. Error: " + err.Error()) + logger.Warn("[sendwalheartbeat] unable to fetch peers. Skipping walheartbeat send.", slog.Any("error", err)) return err } - command := peerdbenv.PeerDBWALHeartbeatQuery() // run above command for each Postgres peer for _, pgPeer := range pgPeers { activity.RecordHeartbeat(ctx, pgPeer.Name) @@ -599,7 +607,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { return } defer pgConn.Close() - cmdErr := pgConn.ExecuteCommand(ctx, command) + cmdErr := pgConn.ExecuteCommand(ctx, walHeartbeatStatement) if cmdErr != nil { logger.Warn(fmt.Sprintf("could not send walheartbeat to peer %v: %v", pgPeer.Name, cmdErr)) } diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 24a98bdca6..ce74f4c3da 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -118,7 +118,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon consumedOffset := atomic.Int64{} consumedOffset.Store(lastOffset) - recordBatchPull := model.NewCDCStream[Items](peerdbenv.PeerDBCDCChannelBufferSize()) + channelBufferSize, err := peerdbenv.PeerDBCDCChannelBufferSize(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get CDC channel buffer size: %w", err) + } + recordBatchPull := model.NewCDCStream[Items](int(channelBufferSize)) recordBatchSync := recordBatchPull if adaptStream != nil { var err error diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 9405cbf4bc..b5f21ffb57 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -12,7 +12,6 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/peerdbenv" @@ -123,7 +122,11 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo deploymentUIDPrefix = fmt.Sprintf("[%s] ", peerdbenv.PeerDBDeploymentUID()) } - defaultSlotLagMBAlertThreshold := dynamicconf.PeerDBSlotLagMBAlertThreshold(ctx) + defaultSlotLagMBAlertThreshold, err := peerdbenv.PeerDBSlotLagMBAlertThreshold(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to get slot lag alert threshold from catalog", slog.Any("error", err)) + return + } // catalog cannot use default threshold to space alerts properly, use the lowest set threshold instead lowestSlotLagMBAlertThreshold := defaultSlotLagMBAlertThreshold for _, alertSender := range alertSenderConfigs { @@ -171,7 +174,11 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, } // same as with slot lag, use lowest threshold for catalog - defaultOpenConnectionsThreshold := dynamicconf.PeerDBOpenConnectionsAlertThreshold(ctx) + defaultOpenConnectionsThreshold, err := peerdbenv.PeerDBOpenConnectionsAlertThreshold(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to get open connections alert threshold from catalog", slog.Any("error", err)) + return + } lowestOpenConnectionsThreshold := defaultOpenConnectionsThreshold for _, alertSender := range alertSenderConfigs { if alertSender.Sender.getOpenConnectionsAlertThreshold() > 0 { @@ -216,7 +223,11 @@ func (a *Alerter) alertToProvider(ctx context.Context, alertSenderConfig AlertSe // in the past X minutes, where X is configurable and defaults to 15 minutes // returns true if alert added to catalog, so proceed with processing alerts to slack func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId int64, alertKey string, alertMessage string) bool { - dur := dynamicconf.PeerDBAlertingGapMinutesAsDuration(ctx) + dur, err := peerdbenv.PeerDBAlertingGapMinutesAsDuration(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to get alerting gap duration from catalog", slog.Any("error", err)) + return false + } if dur == 0 { logger.LoggerFromCtx(ctx).Warn("Alerting disabled via environment variable, returning") return false @@ -227,7 +238,7 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId i ORDER BY created_timestamp DESC LIMIT 1`, alertKey, alertConfigId) var createdTimestamp time.Time - err := row.Scan(&createdTimestamp) + err = row.Scan(&createdTimestamp) if err != nil && err != pgx.ErrNoRows { logger.LoggerFromCtx(ctx).Warn("failed to send alert: ", slog.String("err", err.Error())) return false diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 9b10b1f32d..bcfffaae79 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -18,7 +18,6 @@ import ( metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata" "github.com/PeerDB-io/peer-flow/connectors/utils" numeric "github.com/PeerDB-io/peer-flow/datatypes" - "github.com/PeerDB-io/peer-flow/dynamicconf" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/model" @@ -681,7 +680,10 @@ func (c *BigQueryConnector) SetupNormalizedTable( } } - timePartitionEnabled := dynamicconf.PeerDBBigQueryEnableSyncedAtPartitioning(ctx) + timePartitionEnabled, err := peerdbenv.PeerDBBigQueryEnableSyncedAtPartitioning(ctx) + if err != nil { + return false, fmt.Errorf("failed to get dynamic setting for BigQuery time partitioning: %w", err) + } var timePartitioning *bigquery.TimePartitioning if timePartitionEnabled && syncedAtColName != "" { timePartitioning = &bigquery.TimePartitioning{ @@ -747,9 +749,15 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename continue } + // For a table with replica identity full and a JSON column + // the equals to comparison we do down below will fail + // so we need to use TO_JSON_STRING for those columns + columnIsJSON := make(map[string]bool, len(renameRequest.TableSchema.Columns)) columnNames := make([]string, 0, len(renameRequest.TableSchema.Columns)) for _, col := range renameRequest.TableSchema.Columns { - columnNames = append(columnNames, "`"+col.Name+"`") + quotedCol := "`" + col.Name + "`" + columnNames = append(columnNames, quotedCol) + columnIsJSON[quotedCol] = (col.Type == "json" || col.Type == "jsonb") } if req.SoftDeleteColName != nil { @@ -775,10 +783,19 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename pkeyOnClauseBuilder := strings.Builder{} ljWhereClauseBuilder := strings.Builder{} for idx, col := range pkeyCols { - pkeyOnClauseBuilder.WriteString("_pt.") - pkeyOnClauseBuilder.WriteString(col) - pkeyOnClauseBuilder.WriteString(" = _resync.") - pkeyOnClauseBuilder.WriteString(col) + if columnIsJSON[col] { + // We need to use TO_JSON_STRING for comparing JSON columns + pkeyOnClauseBuilder.WriteString("TO_JSON_STRING(_pt.") + pkeyOnClauseBuilder.WriteString(col) + pkeyOnClauseBuilder.WriteString(")=TO_JSON_STRING(_resync.") + pkeyOnClauseBuilder.WriteString(col) + pkeyOnClauseBuilder.WriteString(")") + } else { + pkeyOnClauseBuilder.WriteString("_pt.") + pkeyOnClauseBuilder.WriteString(col) + pkeyOnClauseBuilder.WriteString("=_resync.") + pkeyOnClauseBuilder.WriteString(col) + } ljWhereClauseBuilder.WriteString("_resync.") ljWhereClauseBuilder.WriteString(col) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 18dc75e64b..e089c88b12 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -115,7 +115,10 @@ func NewClickhouseConnector( bucketPathSuffix := fmt.Sprintf("%s/%s", url.PathEscape(deploymentUID), url.PathEscape(flowName)) // Fallback: Get S3 credentials from environment - awsBucketName := peerdbenv.PeerDBClickhouseAWSS3BucketName() + awsBucketName, err := peerdbenv.PeerDBClickhouseAWSS3BucketName(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get PeerDB Clickhouse Bucket Name: %w", err) + } if awsBucketName == "" { return nil, errors.New("PeerDB Clickhouse Bucket Name not set") } diff --git a/flow/connectors/connelasticsearch/elasticsearch.go b/flow/connectors/connelasticsearch/elasticsearch.go index f9f6c1819f..e0ed2ef270 100644 --- a/flow/connectors/connelasticsearch/elasticsearch.go +++ b/flow/connectors/connelasticsearch/elasticsearch.go @@ -146,10 +146,13 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context, defer cacheCloser() flushLoopDone := make(chan struct{}) - // we only update lastSeenLSN in the OnSuccess call, so this should be safe even if race - // between loop breaking and closing flushLoopDone go func() { - ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds()) + flushTimeout, err := peerdbenv.PeerDBQueueFlushTimeoutSeconds(ctx) + if err != nil { + esc.logger.Warn("[elasticsearch] failed to get flush timeout, no periodic flushing", slog.Any("error", err)) + return + } + ticker := time.NewTicker(flushTimeout) defer ticker.Stop() for { diff --git a/flow/connectors/eventhub/eventhub.go b/flow/connectors/eventhub/eventhub.go index 1182f4d413..0877f256f7 100644 --- a/flow/connectors/eventhub/eventhub.go +++ b/flow/connectors/eventhub/eventhub.go @@ -184,7 +184,11 @@ func (c *EventHubConnector) processBatch( batchPerTopic := NewHubBatches(c.hubManager) toJSONOpts := model.NewToJSONOptions(c.config.UnnestColumns, false) - ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds()) + flushTimeout, err := peerdbenv.PeerDBQueueFlushTimeoutSeconds(ctx) + if err != nil { + return 0, fmt.Errorf("failed to get flush timeout: %w", err) + } + ticker := time.NewTicker(flushTimeout) defer ticker.Stop() lastSeenLSN := int64(0) diff --git a/flow/connectors/kafka/kafka.go b/flow/connectors/kafka/kafka.go index ec7decc50c..1604927dcb 100644 --- a/flow/connectors/kafka/kafka.go +++ b/flow/connectors/kafka/kafka.go @@ -3,11 +3,14 @@ package connkafka import ( "context" "crypto/tls" + "errors" "fmt" "log/slog" "sync/atomic" "time" + "github.com/twmb/franz-go/pkg/kadm" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/sasl/plain" "github.com/twmb/franz-go/pkg/sasl/scram" @@ -40,7 +43,6 @@ func NewKafkaConnector( kgo.SeedBrokers(config.Servers...), kgo.AllowAutoTopicCreation(), kgo.WithLogger(kslog.New(slog.Default())), // TODO use logger.LoggerFromCtx - kgo.SoftwareNameAndVersion("peerdb", peerdbenv.PeerDBVersionShaShort()), ) if !config.DisableTls { optionalOpts = append(optionalOpts, kgo.DialTLSConfig(&tls.Config{MinVersion: tls.VersionTLS12})) @@ -72,6 +74,11 @@ func NewKafkaConnector( return nil, fmt.Errorf("unsupported SASL mechanism: %s", config.Sasl) } } + force, err := peerdbenv.PeerDBQueueForceTopicCreation(ctx) + if err == nil && force { + optionalOpts = append(optionalOpts, kgo.UnknownTopicRetries(0)) + } + client, err := kgo.NewClient(optionalOpts...) if err != nil { return nil, fmt.Errorf("failed to create kafka client: %w", err) @@ -176,7 +183,12 @@ func (c *KafkaConnector) createPool( lastSeenLSN *atomic.Int64, queueErr func(error), ) (*utils.LPool[poolResult], error) { - return utils.LuaPool(func() (*lua.LState, error) { + maxSize, err := peerdbenv.PeerDBQueueParallelism(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get parallelism: %w", err) + } + + return utils.LuaPool(int(maxSize), func() (*lua.LState, error) { ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) { _ = c.LogFlowInfo(ctx, flowJobName, s) })) @@ -197,13 +209,36 @@ func (c *KafkaConnector) createPool( recordCounter := atomic.Int32{} recordCounter.Store(lenRecords) for _, kr := range result.records { - c.client.Produce(ctx, kr, func(_ *kgo.Record, err error) { + var handler func(*kgo.Record, error) + handler = func(_ *kgo.Record, err error) { if err != nil { - queueErr(err) + var success bool + if errors.Is(err, kerr.UnknownTopicOrPartition) { + force, envErr := peerdbenv.PeerDBQueueForceTopicCreation(ctx) + if envErr == nil && force { + c.logger.Info("[kafka] force topic creation", slog.String("topic", kr.Topic)) + _, err := kadm.NewClient(c.client).CreateTopic(ctx, 1, 3, nil, kr.Topic) + if err != nil && !errors.Is(err, kerr.TopicAlreadyExists) { + c.logger.Warn("[kafka] topic create error", slog.Any("error", err)) + queueErr(err) + return + } + success = true + } + } else { + c.logger.Warn("[kafka] produce error", slog.Any("error", err)) + } + if success { + time.Sleep(time.Second) // topic creation can take time to propagate, throttle + c.client.Produce(ctx, kr, handler) + } else { + queueErr(err) + } } else if recordCounter.Add(-1) == 0 && lastSeenLSN != nil { shared.AtomicInt64Max(lastSeenLSN, result.lsn) } - }) + } + c.client.Produce(ctx, kr, handler) } } }) @@ -223,7 +258,12 @@ func (c *KafkaConnector) SyncRecords(ctx context.Context, req *model.SyncRecords tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) flushLoopDone := make(chan struct{}) go func() { - ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds()) + flushTimeout, err := peerdbenv.PeerDBQueueFlushTimeoutSeconds(ctx) + if err != nil { + c.logger.Warn("[kafka] failed to get flush timeout, no periodic flushing", slog.Any("error", err)) + return + } + ticker := time.NewTicker(flushTimeout) defer ticker.Stop() for { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index cfb1664bf5..1c39cf77bc 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -312,7 +312,10 @@ func PullCdcRecords[Items model.Items]( } var standByLastLogged time.Time - cdcRecordsStorage := utils.NewCDCStore[Items](p.flowJobName) + cdcRecordsStorage, err := utils.NewCDCStore[Items](ctx, p.flowJobName) + if err != nil { + return err + } defer func() { if cdcRecordsStorage.IsEmpty() { records.SignalAsEmpty() diff --git a/flow/connectors/pubsub/pubsub.go b/flow/connectors/pubsub/pubsub.go index 7bfe41a5e2..f097424dc9 100644 --- a/flow/connectors/pubsub/pubsub.go +++ b/flow/connectors/pubsub/pubsub.go @@ -138,7 +138,12 @@ func (c *PubSubConnector) createPool( publish chan<- publishResult, queueErr func(error), ) (*utils.LPool[poolResult], error) { - return utils.LuaPool(func() (*lua.LState, error) { + maxSize, err := peerdbenv.PeerDBQueueParallelism(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get parallelism: %w", err) + } + + return utils.LuaPool(int(maxSize), func() (*lua.LState, error) { ls, err := utils.LoadScript(ctx, script, utils.LuaPrintFn(func(s string) { _ = c.LogFlowInfo(ctx, flowJobName, s) })) @@ -157,14 +162,20 @@ func (c *PubSubConnector) createPool( topicClient.EnableMessageOrdering = true } - exists, err := topicClient.Exists(ctx) - if err != nil { - return nil, fmt.Errorf("error checking if topic exists: %w", err) + force, envErr := peerdbenv.PeerDBQueueForceTopicCreation(ctx) + if envErr != nil { + return nil, envErr } - if !exists { - topicClient, err = c.client.CreateTopic(ctx, message.Topic) + if force { + exists, err := topicClient.Exists(ctx) if err != nil { - return nil, fmt.Errorf("error creating topic: %w", err) + return nil, fmt.Errorf("error checking if topic exists: %w", err) + } + if !exists { + topicClient, err = c.client.CreateTopic(ctx, message.Topic) + if err != nil { + return nil, fmt.Errorf("error creating topic: %w", err) + } } } return topicClient, nil @@ -262,7 +273,12 @@ func (c *PubSubConnector) SyncRecords(ctx context.Context, req *model.SyncRecord flushLoopDone := make(chan struct{}) go func() { - ticker := time.NewTicker(peerdbenv.PeerDBQueueFlushTimeoutSeconds()) + flushTimeout, err := peerdbenv.PeerDBQueueFlushTimeoutSeconds(ctx) + if err != nil { + c.logger.Warn("[pubsub] failed to get flush timeout, no periodic flushing", slog.Any("error", err)) + return + } + ticker := time.NewTicker(flushTimeout) defer ticker.Stop() for { diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 6a179591e3..58522b207c 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -535,7 +535,11 @@ func (c *SnowflakeConnector) mergeTablesForBatch( var totalRowsAffected int64 = 0 g, gCtx := errgroup.WithContext(ctx) - g.SetLimit(peerdbenv.PeerDBSnowflakeMergeParallelism()) + mergeParallelism, err := peerdbenv.PeerDBSnowflakeMergeParallelism(ctx) + if err != nil { + return fmt.Errorf("failed to get merge parallelism: %w", err) + } + g.SetLimit(int(mergeParallelism)) mergeGen := &mergeStmtGenerator{ rawTableName: getRawTableIdentifier(flowName), diff --git a/flow/connectors/utils/cdc_store.go b/flow/connectors/utils/cdc_store.go index a98f10c656..4de95db70f 100644 --- a/flow/connectors/utils/cdc_store.go +++ b/flow/connectors/utils/cdc_store.go @@ -2,6 +2,7 @@ package utils import ( "bytes" + "context" "encoding/gob" "errors" "fmt" @@ -43,16 +44,24 @@ type cdcStore[Items model.Items] struct { numRecordsSwitchThreshold int } -func NewCDCStore[Items model.Items](flowJobName string) *cdcStore[Items] { +func NewCDCStore[Items model.Items](ctx context.Context, flowJobName string) (*cdcStore[Items], error) { + numRecordsSwitchThreshold, err := peerdbenv.PeerDBCDCDiskSpillRecordsThreshold(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get CDC disk spill records threshold: %w", err) + } + memPercent, err := peerdbenv.PeerDBCDCDiskSpillMemPercentThreshold(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get CDC disk spill memory percent threshold: %w", err) + } + return &cdcStore[Items]{ inMemoryRecords: make(map[model.TableWithPkey]model.Record[Items]), pebbleDB: nil, numRecords: atomic.Int32{}, flowJobName: flowJobName, dbFolderName: fmt.Sprintf("%s/%s_%s", os.TempDir(), flowJobName, shared.RandomString(8)), - numRecordsSwitchThreshold: peerdbenv.PeerDBCDCDiskSpillRecordsThreshold(), + numRecordsSwitchThreshold: int(numRecordsSwitchThreshold), memThresholdBytes: func() uint64 { - memPercent := peerdbenv.PeerDBCDCDiskSpillMemPercentThreshold() maxMemBytes := peerdbenv.PeerDBFlowWorkerMaxMemBytes() if memPercent > 0 && maxMemBytes > 0 { return maxMemBytes * uint64(memPercent) / 100 @@ -61,7 +70,7 @@ func NewCDCStore[Items model.Items](flowJobName string) *cdcStore[Items] { }(), thresholdReason: "", memStats: []metrics.Sample{{Name: "/memory/classes/heap/objects:bytes"}}, - } + }, nil } func init() { diff --git a/flow/connectors/utils/cdc_store_test.go b/flow/connectors/utils/cdc_store_test.go index ab6560f3d1..a278693dfb 100644 --- a/flow/connectors/utils/cdc_store_test.go +++ b/flow/connectors/utils/cdc_store_test.go @@ -1,6 +1,7 @@ package utils import ( + "context" "crypto/rand" "log/slog" "testing" @@ -67,11 +68,12 @@ func genKeyAndRec(t *testing.T) (model.TableWithPkey, model.Record[model.RecordI func TestSingleRecord(t *testing.T) { t.Parallel() - cdcRecordsStore := NewCDCStore[model.RecordItems]("test_single_record") + cdcRecordsStore, err := NewCDCStore[model.RecordItems](context.Background(), "test_single_record") + require.NoError(t, err) cdcRecordsStore.numRecordsSwitchThreshold = 10 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(slog.Default(), key, rec) + err = cdcRecordsStore.Set(slog.Default(), key, rec) require.NoError(t, err) // should not spill into DB require.Len(t, cdcRecordsStore.inMemoryRecords, 1) @@ -87,7 +89,8 @@ func TestSingleRecord(t *testing.T) { func TestRecordsTillSpill(t *testing.T) { t.Parallel() - cdcRecordsStore := NewCDCStore[model.RecordItems]("test_records_till_spill") + cdcRecordsStore, err := NewCDCStore[model.RecordItems](context.Background(), "test_records_till_spill") + require.NoError(t, err) cdcRecordsStore.numRecordsSwitchThreshold = 10 // add records upto set limit @@ -101,7 +104,7 @@ func TestRecordsTillSpill(t *testing.T) { // this record should be spilled to DB key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(slog.Default(), key, rec) + err = cdcRecordsStore.Set(slog.Default(), key, rec) require.NoError(t, err) _, ok := cdcRecordsStore.inMemoryRecords[key] require.False(t, ok) @@ -118,11 +121,12 @@ func TestRecordsTillSpill(t *testing.T) { func TestTimeAndDecimalEncoding(t *testing.T) { t.Parallel() - cdcRecordsStore := NewCDCStore[model.RecordItems]("test_time_encoding") + cdcRecordsStore, err := NewCDCStore[model.RecordItems](context.Background(), "test_time_encoding") + require.NoError(t, err) cdcRecordsStore.numRecordsSwitchThreshold = 0 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(slog.Default(), key, rec) + err = cdcRecordsStore.Set(slog.Default(), key, rec) require.NoError(t, err) retreived, ok, err := cdcRecordsStore.Get(key) @@ -139,11 +143,12 @@ func TestTimeAndDecimalEncoding(t *testing.T) { func TestNullKeyDoesntStore(t *testing.T) { t.Parallel() - cdcRecordsStore := NewCDCStore[model.RecordItems]("test_time_encoding") + cdcRecordsStore, err := NewCDCStore[model.RecordItems](context.Background(), "test_time_encoding") + require.NoError(t, err) cdcRecordsStore.numRecordsSwitchThreshold = 0 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(slog.Default(), model.TableWithPkey{}, rec) + err = cdcRecordsStore.Set(slog.Default(), model.TableWithPkey{}, rec) require.NoError(t, err) retreived, ok, err := cdcRecordsStore.Get(key) diff --git a/flow/connectors/utils/lua.go b/flow/connectors/utils/lua.go index f1d82f373f..24c37394d7 100644 --- a/flow/connectors/utils/lua.go +++ b/flow/connectors/utils/lua.go @@ -5,12 +5,11 @@ import ( "fmt" "strings" - "github.com/yuin/gopher-lua" + lua "github.com/yuin/gopher-lua" "github.com/PeerDB-io/gluaflatbuffers" "github.com/PeerDB-io/gluajson" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/pua" "github.com/PeerDB-io/peer-flow/shared" ) @@ -109,8 +108,7 @@ type LPool[T any] struct { closed bool } -func LuaPool[T any](cons func() (*lua.LState, error), merge func(T)) (*LPool[T], error) { - maxSize := peerdbenv.PeerDBQueueParallelism() +func LuaPool[T any](maxSize int, cons func() (*lua.LState, error), merge func(T)) (*LPool[T], error) { returns := make(chan (<-chan T), maxSize) wait := make(chan struct{}) go func() { diff --git a/flow/dynamicconf/dynamicconf.go b/flow/dynamicconf/dynamicconf.go deleted file mode 100644 index aedbce65ce..0000000000 --- a/flow/dynamicconf/dynamicconf.go +++ /dev/null @@ -1,105 +0,0 @@ -package dynamicconf - -import ( - "context" - "strconv" - "time" - - "github.com/jackc/pgx/v5/pgtype" - "github.com/jackc/pgx/v5/pgxpool" - - "github.com/PeerDB-io/peer-flow/logger" - "github.com/PeerDB-io/peer-flow/peerdbenv" -) - -func dynamicConfKeyExists(ctx context.Context, conn *pgxpool.Pool, key string) bool { - var exists pgtype.Bool - query := "SELECT EXISTS(SELECT 1 FROM alerting_settings WHERE config_name = $1)" - err := conn.QueryRow(ctx, query, key).Scan(&exists) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to check if key exists: %v", err) - return false - } - - return exists.Bool -} - -func dynamicConfUint32(ctx context.Context, key string, defaultValue uint32) uint32 { - conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return defaultValue - } - - if !dynamicConfKeyExists(ctx, conn, key) { - return defaultValue - } - - var value pgtype.Text - query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" - err = conn.QueryRow(ctx, query, key).Scan(&value) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return defaultValue - } - - result, err := strconv.ParseUint(value.String, 10, 32) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to parse uint32: %v", err) - return defaultValue - } - - return uint32(result) -} - -func dynamicConfBool(ctx context.Context, key string, defaultValue bool) bool { - conn, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool: %v", err) - return defaultValue - } - - if !dynamicConfKeyExists(ctx, conn, key) { - return defaultValue - } - - var value pgtype.Text - query := "SELECT config_value FROM alerting_settings WHERE config_name = $1" - err = conn.QueryRow(ctx, query, key).Scan(&value) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to get key: %v", err) - return defaultValue - } - - result, err := strconv.ParseBool(value.String) - if err != nil { - logger.LoggerFromCtx(ctx).Error("Failed to parse bool: %v", err) - return defaultValue - } - - return result -} - -// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely -func PeerDBSlotLagMBAlertThreshold(ctx context.Context) uint32 { - return dynamicConfUint32(ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD", 5000) -} - -// PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely -func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) time.Duration { - why := int64(dynamicConfUint32(ctx, "PEERDB_ALERTING_GAP_MINUTES", 15)) - return time.Duration(why) * time.Minute -} - -// PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely -func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) uint32 { - return dynamicConfUint32(ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD", 5) -} - -// PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS, for creating target tables with -// partitioning by _PEERDB_SYNCED_AT column -// If true, the target tables will be partitioned by _PEERDB_SYNCED_AT column -// If false, the target tables will not be partitioned -func PeerDBBigQueryEnableSyncedAtPartitioning(ctx context.Context) bool { - return dynamicConfBool(ctx, "PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS", false) -} diff --git a/flow/e2e/pubsub/pubsub_test.go b/flow/e2e/pubsub/pubsub_test.go index df6809b819..714acff0b3 100644 --- a/flow/e2e/pubsub/pubsub_test.go +++ b/flow/e2e/pubsub/pubsub_test.go @@ -159,7 +159,6 @@ func (s PubSubSuite) TestCreateTopic() { require.NoError(s.t, err) topic := psclient.Topic(flowName) exists, err := topic.Exists(context.Background()) - s.t.Log("WWWW exists", exists) require.NoError(s.t, err) return exists }) diff --git a/flow/go.mod b/flow/go.mod index c7109374af..abba2c42f2 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -1,6 +1,6 @@ module github.com/PeerDB-io/peer-flow -go 1.22.3 +go 1.22.4 require ( cloud.google.com/go v0.114.0 @@ -45,6 +45,7 @@ require ( github.com/snowflakedb/gosnowflake v1.10.1 github.com/stretchr/testify v1.9.0 github.com/twmb/franz-go v1.17.0 + github.com/twmb/franz-go/pkg/kadm v1.12.0 github.com/twmb/franz-go/plugin/kslog v1.0.0 github.com/twpayne/go-geos v0.17.1 github.com/urfave/cli/v3 v3.0.0-alpha9 diff --git a/flow/go.sum b/flow/go.sum index 6ec593ec40..5b8b6c3a07 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -389,6 +389,8 @@ github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8 github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk= github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= +github.com/twmb/franz-go/pkg/kadm v1.12.0 h1:I8P/gpXFzhl73QcAYmJu+1fOXvrynyH/MAotr2udEg4= +github.com/twmb/franz-go/pkg/kadm v1.12.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0= github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA= github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU= github.com/twmb/franz-go/plugin/kslog v1.0.0 h1:I64oEmF+0PDvmyLgwrlOtg4mfpSE9GwlcLxM4af2t60= diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index bcf1ac050e..6c7501b0a3 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -29,21 +29,6 @@ func PeerFlowTaskQueueName(taskQueueID shared.TaskQueueID) string { return fmt.Sprintf("%s-%s", deploymentUID, taskQueueID) } -// PEERDB_CDC_CHANNEL_BUFFER_SIZE -func PeerDBCDCChannelBufferSize() int { - return getEnvInt("PEERDB_CDC_CHANNEL_BUFFER_SIZE", 1<<18) -} - -// PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS -func PeerDBQueueFlushTimeoutSeconds() time.Duration { - x := getEnvInt("PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS", 10) - return time.Duration(x) * time.Second -} - -func PeerDBQueueParallelism() int { - return getEnvInt("PEERDB_QUEUE_PARALLELISM", 4) -} - // env variable doesn't exist anymore, but tests appear to depend on this // in lieu of an actual value of IdleTimeoutSeconds func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { @@ -56,16 +41,6 @@ func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { return time.Duration(x) * time.Second } -// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD -func PeerDBCDCDiskSpillRecordsThreshold() int { - return getEnvInt("PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD", 1_000_000) -} - -// PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD, negative numbers means memory threshold disabled -func PeerDBCDCDiskSpillMemPercentThreshold() int { - return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) -} - // GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum func PeerDBFlowWorkerMaxMemBytes() uint64 { return getEnvUint[uint64]("GOMEMLIMIT", 0) @@ -96,29 +71,6 @@ func PeerDBCatalogDatabase() string { return GetEnvString("PEERDB_CATALOG_DATABASE", "") } -// PEERDB_ENABLE_WAL_HEARTBEAT -func PeerDBEnableWALHeartbeat() bool { - return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false) -} - -// PEERDB_WAL_HEARTBEAT_QUERY -func PeerDBWALHeartbeatQuery() string { - return GetEnvString("PEERDB_WAL_HEARTBEAT_QUERY", `BEGIN; -DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); -CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); -DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); -END;`) -} - -// PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE -func PeerDBEnableParallelSyncNormalize() bool { - return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) -} - -func PeerDBSnowflakeMergeParallelism() int { - return getEnvInt("PEERDB_SNOWFLAKE_MERGE_PARALLELISM", 8) -} - // PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN func PeerDBTelemetryAWSSNSTopicArn() string { return GetEnvString("PEERDB_TELEMETRY_AWS_SNS_TOPIC_ARN", "") @@ -140,8 +92,3 @@ func PeerDBAlertingEmailSenderRegion() string { func PeerDBAlertingEmailSenderReplyToAddresses() string { return GetEnvString("PEERDB_ALERTING_EMAIL_SENDER_REPLY_TO_ADDRESSES", "") } - -// PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME -func PeerDBClickhouseAWSS3BucketName() string { - return GetEnvString("PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME", "") -} diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go new file mode 100644 index 0000000000..928147d4ab --- /dev/null +++ b/flow/peerdbenv/dynamicconf.go @@ -0,0 +1,165 @@ +package peerdbenv + +import ( + "context" + "fmt" + "log/slog" + "os" + "strconv" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "golang.org/x/exp/constraints" + + "github.com/PeerDB-io/peer-flow/logger" +) + +func dynLookup(ctx context.Context, key string) (string, error) { + conn, err := GetCatalogConnectionPoolFromEnv(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to get catalog connection pool", slog.Any("error", err)) + return "", fmt.Errorf("failed to get catalog connection pool: %w", err) + } + + var value pgtype.Text + var default_value pgtype.Text + query := "SELECT config_value, config_default_value FROM dynamic_settings WHERE config_name=$1" + err = conn.QueryRow(ctx, query, key).Scan(&value, &default_value) + if err != nil { + if err == pgx.ErrNoRows { + if val, ok := os.LookupEnv(key); ok { + return val, nil + } + } + logger.LoggerFromCtx(ctx).Error("Failed to get key", slog.Any("error", err)) + return "", fmt.Errorf("failed to get key: %w", err) + } + if !value.Valid { + if val, ok := os.LookupEnv(key); ok { + return val, nil + } + return default_value.String, nil + } + return value.String, nil +} + +func dynLookupConvert[T any](ctx context.Context, key string, fn func(string) (T, error)) (T, error) { + value, err := dynLookup(ctx, key) + if err != nil { + var none T + return none, err + } + return fn(value) +} + +func dynamicConfSigned[T constraints.Signed](ctx context.Context, key string) (T, error) { + value, err := dynLookupConvert(ctx, key, func(value string) (int64, error) { + return strconv.ParseInt(value, 10, 64) + }) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to parse as int64", slog.Any("error", err)) + return 0, fmt.Errorf("failed to parse as int64: %w", err) + } + + return T(value), nil +} + +func dynamicConfUnsigned[T constraints.Unsigned](ctx context.Context, key string) (T, error) { + value, err := dynLookupConvert(ctx, key, func(value string) (uint64, error) { + return strconv.ParseUint(value, 10, 64) + }) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to parse as uint64", slog.Any("error", err)) + return 0, fmt.Errorf("failed to parse as uint64: %w", err) + } + + return T(value), nil +} + +func dynamicConfBool(ctx context.Context, key string) (bool, error) { + value, err := dynLookupConvert(ctx, key, strconv.ParseBool) + if err != nil { + logger.LoggerFromCtx(ctx).Error("Failed to parse bool", slog.Any("error", err)) + return false, fmt.Errorf("failed to parse bool: %w", err) + } + + return value, nil +} + +// PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD, 0 disables slot lag alerting entirely +func PeerDBSlotLagMBAlertThreshold(ctx context.Context) (uint32, error) { + return dynamicConfUnsigned[uint32](ctx, "PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD") +} + +// PEERDB_ALERTING_GAP_MINUTES, 0 disables all alerting entirely +func PeerDBAlertingGapMinutesAsDuration(ctx context.Context) (time.Duration, error) { + why, err := dynamicConfSigned[int64](ctx, "PEERDB_ALERTING_GAP_MINUTES") + if err != nil { + return 0, err + } + return time.Duration(why) * time.Minute, nil +} + +// PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD, 0 disables open connections alerting entirely +func PeerDBOpenConnectionsAlertThreshold(ctx context.Context) (uint32, error) { + return dynamicConfUnsigned[uint32](ctx, "PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD") +} + +// PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS, for creating target tables with +// partitioning by _PEERDB_SYNCED_AT column +// If true, the target tables will be partitioned by _PEERDB_SYNCED_AT column +// If false, the target tables will not be partitioned +func PeerDBBigQueryEnableSyncedAtPartitioning(ctx context.Context) (bool, error) { + return dynamicConfBool(ctx, "PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS") +} + +func PeerDBCDCChannelBufferSize(ctx context.Context) (int64, error) { + return dynamicConfSigned[int64](ctx, "PEERDB_CDC_CHANNEL_BUFFER_SIZE") +} + +func PeerDBQueueFlushTimeoutSeconds(ctx context.Context) (time.Duration, error) { + x, err := dynamicConfSigned[int64](ctx, "PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS") + if err != nil { + return 0, err + } + return time.Duration(x) * time.Second, nil +} + +func PeerDBQueueParallelism(ctx context.Context) (int64, error) { + return dynamicConfSigned[int64](ctx, "PEERDB_QUEUE_PARALLELISM") +} + +func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context) (int64, error) { + return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD") +} + +func PeerDBCDCDiskSpillMemPercentThreshold(ctx context.Context) (int64, error) { + return dynamicConfSigned[int64](ctx, "PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD") +} + +func PeerDBEnableWALHeartbeat(ctx context.Context) (bool, error) { + return dynamicConfBool(ctx, "PEERDB_ENABLE_WAL_HEARTBEAT") +} + +func PeerDBWALHeartbeatQuery(ctx context.Context) (string, error) { + return dynLookup(ctx, "PEERDB_WAL_HEARTBEAT_QUERY") +} + +func PeerDBEnableParallelSyncNormalize(ctx context.Context) (bool, error) { + return dynamicConfBool(ctx, "PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE") +} + +func PeerDBSnowflakeMergeParallelism(ctx context.Context) (int64, error) { + return dynamicConfSigned[int64](ctx, "PEERDB_SNOWFLAKE_MERGE_PARALLELISM") +} + +func PeerDBClickhouseAWSS3BucketName(ctx context.Context) (string, error) { + return dynLookup(ctx, "PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME") +} + +// Kafka has topic auto create as an option, auto.create.topics.enable +// But non-dedicated cluster maybe can't set config, may want peerdb to create topic. Similar for PubSub +func PeerDBQueueForceTopicCreation(ctx context.Context) (bool, error) { + return dynamicConfBool(ctx, "PEERDB_QUEUE_FORCE_TOPIC_CREATION") +} diff --git a/flow/peerdbenv/env.go b/flow/peerdbenv/env.go index 27ef13cbc0..790fe84f67 100644 --- a/flow/peerdbenv/env.go +++ b/flow/peerdbenv/env.go @@ -41,22 +41,6 @@ func getEnvUint[T constraints.Unsigned](name string, defaultValue T) T { return T(i) } -// getEnvBool returns the value of the environment variable with the given name -// or defaultValue if the environment variable is not set or is not a valid value. -func getEnvBool(name string, defaultValue bool) bool { - val, ok := os.LookupEnv(name) - if !ok { - return defaultValue - } - - b, err := strconv.ParseBool(val) - if err != nil { - return defaultValue - } - - return b -} - // GetEnvString returns the value of the environment variable with the given name // or defaultValue if the environment variable is not set. func GetEnvString(name string, defaultValue string) string { diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 51ccaf7f5f..a01a2febc5 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -502,9 +502,7 @@ func CDCFlowWorkflow( maps.Copy(state.SyncFlowOptions.TableNameSchemaMapping, payload.TableNameSchemaMapping) }) - parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { - return peerdbenv.PeerDBEnableParallelSyncNormalize() - }) + parallel := getParallelSyncNormalize(ctx, logger) if !parallel { normDoneChan := model.NormalizeDoneSignal.GetSignalChannel(ctx) normDoneChan.Drain() diff --git a/flow/workflows/local_activities.go b/flow/workflows/local_activities.go new file mode 100644 index 0000000000..c5a25c8ea4 --- /dev/null +++ b/flow/workflows/local_activities.go @@ -0,0 +1,25 @@ +package peerflow + +import ( + "log/slog" + "time" + + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/workflow" + + "github.com/PeerDB-io/peer-flow/peerdbenv" +) + +func getParallelSyncNormalize(wCtx workflow.Context, logger log.Logger) bool { + checkCtx := workflow.WithLocalActivityOptions(wCtx, workflow.LocalActivityOptions{ + StartToCloseTimeout: time.Minute, + }) + + getParallelFuture := workflow.ExecuteLocalActivity(checkCtx, peerdbenv.PeerDBEnableParallelSyncNormalize) + var parallel bool + if err := getParallelFuture.Get(checkCtx, ¶llel); err != nil { + logger.Warn("Failed to get status of parallel sync-normalize", slog.Any("error", err)) + return false + } + return parallel +} diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index e4ed09e072..0fa039e509 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -9,7 +9,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -108,9 +107,7 @@ func NormalizeFlowWorkflow( } if ctx.Err() == nil && !state.Stop { - parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { - return peerdbenv.PeerDBEnableParallelSyncNormalize() - }) + parallel := getParallelSyncNormalize(ctx, logger) if !parallel { _ = model.NormalizeDoneSignal.SignalExternalWorkflow( diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 75d9946526..71628d2917 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -201,11 +201,11 @@ func (q *QRepPartitionFlowExecution) replicatePartitions(ctx workflow.Context, ) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 24 * 5 * time.Hour, - HeartbeatTimeout: time.Minute, + HeartbeatTimeout: 5 * time.Minute, RetryPolicy: &temporal.RetryPolicy{ InitialInterval: time.Minute, BackoffCoefficient: 2., - MaximumInterval: time.Hour, + MaximumInterval: 10 * time.Minute, MaximumAttempts: 0, NonRetryableErrorTypes: nil, }, diff --git a/flow/workflows/scheduled_flows.go b/flow/workflows/scheduled_flows.go index 61db3d0262..7aae7d7c70 100644 --- a/flow/workflows/scheduled_flows.go +++ b/flow/workflows/scheduled_flows.go @@ -5,8 +5,6 @@ import ( "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/workflow" - - "github.com/PeerDB-io/peer-flow/peerdbenv" ) // RecordSlotSizeWorkflow monitors replication slot size @@ -47,18 +45,13 @@ func withCronOptions(ctx workflow.Context, workflowID string, cron string) workf func GlobalScheduleManagerWorkflow(ctx workflow.Context) error { info := workflow.GetInfo(ctx) - walHeartbeatEnabled := GetSideEffect(ctx, func(_ workflow.Context) bool { - return peerdbenv.PeerDBEnableWALHeartbeat() - }) - if walHeartbeatEnabled { - heartbeatCtx := withCronOptions(ctx, - "wal-heartbeat-"+info.OriginalRunID, - "*/12 * * * *") - workflow.ExecuteChildWorkflow( - heartbeatCtx, - HeartbeatFlowWorkflow, - ) - } + heartbeatCtx := withCronOptions(ctx, + "wal-heartbeat-"+info.OriginalRunID, + "*/12 * * * *") + workflow.ExecuteChildWorkflow( + heartbeatCtx, + HeartbeatFlowWorkflow, + ) slotSizeCtx := withCronOptions(ctx, "record-slot-size-"+info.OriginalRunID, diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 711928ef5c..f126b721d2 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -11,7 +11,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -77,9 +76,7 @@ func SyncFlowWorkflow( }) var waitSelector workflow.Selector - parallel := GetSideEffect(ctx, func(_ workflow.Context) bool { - return peerdbenv.PeerDBEnableParallelSyncNormalize() - }) + parallel := getParallelSyncNormalize(ctx, logger) if !parallel { waitSelector = workflow.NewNamedSelector(ctx, "NormalizeWait") waitSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) diff --git a/nexus/.cargo/config.toml b/nexus/.cargo/config.toml deleted file mode 100644 index bff29e6e17..0000000000 --- a/nexus/.cargo/config.toml +++ /dev/null @@ -1,2 +0,0 @@ -[build] -rustflags = ["--cfg", "tokio_unstable"] diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index f1d56e39ba..b0b3b18d59 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -127,9 +127,9 @@ dependencies = [ [[package]] name = "anstyle-query" -version = "1.0.3" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5" +checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" dependencies = [ "windows-sys 0.52.0", ] @@ -244,7 +244,7 @@ dependencies = [ "futures-util", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.28", + "hyper 0.14.29", "itoa", "matchit", "memchr", @@ -590,9 +590,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.4" +version = "4.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" +checksum = "a9689a29b593160de5bc4aacab7b5d54fb52231de70122626c178e6a368994c7" dependencies = [ "clap_builder", "clap_derive", @@ -600,9 +600,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.2" +version = "4.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" +checksum = "2e5387378c84f6faa26890ebf9f0a92989f8873d4d380467bcd0d8d8620424df" dependencies = [ "anstream", "anstyle", @@ -612,9 +612,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.4" +version = "4.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" +checksum = "c780290ccf4fb26629baa7a1081e68ced113f1d3ec302fa5948f1c381ebf06c6" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -624,9 +624,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" +checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" [[package]] name = "cmake" @@ -643,43 +643,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" -[[package]] -name = "console-api" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787" -dependencies = [ - "futures-core", - "prost", - "prost-types", - "tonic 0.10.2", - "tracing-core", -] - -[[package]] -name = "console-subscriber" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e" -dependencies = [ - "console-api", - "crossbeam-channel", - "crossbeam-utils", - "futures-task", - "hdrhistogram", - "humantime", - "prost-types", - "serde", - "serde_json", - "thread_local", - "tokio", - "tokio-stream", - "tonic 0.10.2", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "const-oid" version = "0.9.6" @@ -1207,19 +1170,6 @@ dependencies = [ "allocator-api2", ] -[[package]] -name = "hdrhistogram" -version = "7.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" -dependencies = [ - "base64 0.21.7", - "byteorder", - "flate2", - "nom", - "num-traits", -] - [[package]] name = "heck" version = "0.4.1" @@ -1321,17 +1271,11 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hyper" -version = "0.14.28" +version = "0.14.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33" dependencies = [ "bytes", "futures-channel", @@ -1378,7 +1322,7 @@ checksum = "399c78f9338483cb7e630c8474b07268983c6bd5acee012e4211f9f7bb21b070" dependencies = [ "futures-util", "http 0.2.12", - "hyper 0.14.28", + "hyper 0.14.29", "log", "rustls 0.22.4", "rustls-native-certs", @@ -1410,7 +1354,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper 0.14.28", + "hyper 0.14.29", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -1662,15 +1606,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "matchers" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" -dependencies = [ - "regex-automata 0.1.10", -] - [[package]] name = "matchit" version = "0.7.3" @@ -2181,7 +2116,6 @@ dependencies = [ "cargo-deb", "catalog", "clap", - "console-subscriber", "dashmap", "dotenvy", "flow-rs", @@ -2382,7 +2316,7 @@ version = "0.1.0" dependencies = [ "anyhow", "pt", - "rustls 0.23.8", + "rustls 0.23.9", "tokio", "tokio-postgres", "tokio-postgres-rustls", @@ -2562,7 +2496,7 @@ dependencies = [ "serde", "serde_json", "sqlparser", - "tonic 0.11.0", + "tonic", "tonic-reflection", ] @@ -2726,17 +2660,8 @@ checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.6", - "regex-syntax 0.8.3", -] - -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" -dependencies = [ - "regex-syntax 0.6.29", + "regex-automata", + "regex-syntax", ] [[package]] @@ -2747,15 +2672,9 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.3", + "regex-syntax", ] -[[package]] -name = "regex-syntax" -version = "0.6.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" - [[package]] name = "regex-syntax" version = "0.8.3" @@ -2936,9 +2855,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.8" +version = "0.23.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79adb16721f56eb2d843e67676896a61ce7a0fa622dc18d3e372477a029d2740" +checksum = "a218f0f6d05669de4eabfb24f31ce802035c952429d037507b4a4a39f0e60c5b" dependencies = [ "log", "once_cell", @@ -3410,9 +3329,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tar" -version = "0.4.40" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16afcea1f22891c49a00c751c7b63b2233284064f11a200fc624137c51e2ddb" +checksum = "cb797dad5fb5b76fcf519e702f4a589483b5ef06567f160c392832c1f5e44909" dependencies = [ "filetime", "libc", @@ -3524,7 +3443,6 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "tracing", "windows-sys 0.48.0", ] @@ -3582,7 +3500,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04fb792ccd6bbcd4bba408eb8a292f70fc4a3589e5d793626f45190e6454b6ab" dependencies = [ "ring", - "rustls 0.23.8", + "rustls 0.23.9", "tokio", "tokio-postgres", "tokio-rustls 0.26.0", @@ -3606,7 +3524,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.8", + "rustls 0.23.9", "rustls-pki-types", "tokio", ] @@ -3637,14 +3555,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.13" +version = "0.8.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4e43f8cc456c9704c851ae29c67e17ef65d2c30017c17a9765b89c382dc8bba" +checksum = "6f49eb2ab21d2f26bd6db7bf383edc527a7ebaee412d17af4d40fdccd442f335" dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.22.13", + "toml_edit 0.22.14", ] [[package]] @@ -3669,42 +3587,15 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.22.13" +version = "0.22.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c127785850e8c20836d49732ae6abfa47616e60bf9d9f57c43c250361a9db96c" +checksum = "f21c7aaf97f1bd9ca9d4f9e73b0a6c74bd5afef56f2bc931943a6e1c37e04e38" dependencies = [ "indexmap 2.2.6", "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.9", -] - -[[package]] -name = "tonic" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" -dependencies = [ - "async-stream", - "async-trait", - "axum", - "base64 0.21.7", - "bytes", - "h2", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.28", - "hyper-timeout", - "percent-encoding", - "pin-project", - "prost", - "tokio", - "tokio-stream", - "tower", - "tower-layer", - "tower-service", - "tracing", + "winnow 0.6.13", ] [[package]] @@ -3721,7 +3612,7 @@ dependencies = [ "h2", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.28", + "hyper 0.14.29", "hyper-timeout", "percent-encoding", "pin-project", @@ -3744,7 +3635,7 @@ dependencies = [ "prost", "tokio", "tokio-stream", - "tonic 0.11.0", + "tonic", ] [[package]] @@ -3757,7 +3648,7 @@ dependencies = [ "prost-types", "tokio", "tokio-stream", - "tonic 0.11.0", + "tonic", ] [[package]] @@ -3853,14 +3744,10 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ - "matchers", "nu-ansi-term", - "once_cell", - "regex", "sharded-slab", "smallvec", "thread_local", - "tracing", "tracing-core", "tracing-log", ] @@ -3917,9 +3804,9 @@ checksum = "e4259d9d4425d9f0661581b804cb85fe66a4c631cadd8f490d1c13a35d5d9291" [[package]] name = "unicode-width" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6" +checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" [[package]] name = "untrusted" @@ -4129,9 +4016,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.26.1" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +checksum = "3c452ad30530b54a4d8e71952716a212b08efd0f3562baa66c29a618b07da7c3" dependencies = [ "rustls-pki-types", ] @@ -4337,9 +4224,9 @@ dependencies = [ [[package]] name = "winnow" -version = "0.6.9" +version = "0.6.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86c949fede1d13936a99f14fafd3e76fd642b556dd2ce96287fbe2e0151bfac6" +checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1" dependencies = [ "memchr", ] @@ -4402,7 +4289,7 @@ dependencies = [ "base64 0.21.7", "futures", "http 0.2.12", - "hyper 0.14.28", + "hyper 0.14.29", "hyper-rustls 0.25.0", "itertools 0.12.1", "log", diff --git a/nexus/catalog/migrations/V29__dynconf_table_and_settings.sql b/nexus/catalog/migrations/V29__dynconf_table_and_settings.sql new file mode 100644 index 0000000000..adfcf167bd --- /dev/null +++ b/nexus/catalog/migrations/V29__dynconf_table_and_settings.sql @@ -0,0 +1,10 @@ +ALTER TABLE alerting_settings RENAME TO dynamic_settings; +ALTER TABLE dynamic_settings ADD COLUMN config_default_value TEXT, ADD COLUMN config_value_type INT, ADD COLUMN config_description TEXT, ADD COLUMN config_apply_mode INT; +ALTER TABLE dynamic_settings ALTER COLUMN config_value DROP NOT NULL; + +INSERT INTO dynamic_settings (config_name,config_value,config_default_value,config_value_type,config_description,config_apply_mode) +VALUES +('PEERDB_ALERTING_GAP_MINUTES',null,'15',3,'Duration in minutes before reraising alerts, 0 disables all alerting entirely',1), +('PEERDB_SLOT_LAG_MB_ALERT_THRESHOLD',null,'5000',3,'Lag (in MB) threshold on PeerDB slot to start sending alerts, 0 disables slot lag alerting entirely',1), +('PEERDB_PGPEER_OPEN_CONNECTIONS_ALERT_THRESHOLD',null,'5',3,'Open connections from PeerDB user threshold to start sending alerts, 0 disables open connections alerting entirely',1), +('PEERDB_BIGQUERY_ENABLE_SYNCED_AT_PARTITIONING_BY_DAYS',null,'false',4,'BigQuery only: create target tables with partitioning by _PEERDB_SYNCED_AT column',4); diff --git a/nexus/catalog/migrations/V30__more_dynconf_settings.sql b/nexus/catalog/migrations/V30__more_dynconf_settings.sql new file mode 100644 index 0000000000..71656d8922 --- /dev/null +++ b/nexus/catalog/migrations/V30__more_dynconf_settings.sql @@ -0,0 +1,18 @@ +INSERT INTO dynamic_settings (config_name,config_default_value,config_value_type,config_description,config_apply_mode) +VALUES +('PEERDB_CDC_CHANNEL_BUFFER_SIZE','262144',2,'Advanced setting: changes buffer size of channel PeerDB uses while streaming rows read to destination in CDC',1), +('PEERDB_QUEUE_FLUSH_TIMEOUT_SECONDS','10',2,'Frequency of flushing to queue, applicable for PeerDB Streams mirrors only',1), +('PEERDB_QUEUE_PARALLELISM','4',2,'Parallelism for Lua script processing data, applicable for CDC mirrors to Kakfa and PubSub',1), +('PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD','1000000',2,'CDC: number of records beyond which records are written to disk instead',1), +('PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD','-1',2,'CDC: worker memory usage (in %) beyond which records are written to disk instead, -1 disables',1), +('PEERDB_ENABLE_WAL_HEARTBEAT','false',4,'enables WAL heartbeat to prevent replication slot lag from increasing during times of no activity',1), +('PEERDB_WAL_HEARTBEAT_QUERY','BEGIN; +DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); +CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); +DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); +END;',1,'SQL statement to run during each WAL heartbeat',1), +('PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE','false',4,'Advanced setting: enables experimental parallel sync (moving rows to target) and normalize (updating rows in target table)',2), +('PEERDB_SNOWFLAKE_MERGE_PARALLELISM','8',2,'Number of MERGE statements to run in parallel, applies to CDC mirrors with Snowflake targets. -1 means no limit',1), +('PEERDB_CLICKHOUSE_AWS_S3_BUCKET_NAME','',1,'S3 buckets to store Avro files for mirrors with ClickHouse target',1), +('PEERDB_QUEUE_FORCE_TOPIC_CREATION','false',4,'Force auto topic creation in mirrors, applies to Kafka and PubSub mirrors',4) + ON CONFLICT DO NOTHING; diff --git a/nexus/postgres-connection/src/lib.rs b/nexus/postgres-connection/src/lib.rs index 4328978adf..7b2591687a 100644 --- a/nexus/postgres-connection/src/lib.rs +++ b/nexus/postgres-connection/src/lib.rs @@ -91,13 +91,11 @@ pub async fn connect_postgres(config: &PostgresConfig) -> anyhow::Result TracerGuards { - let console_layer = console_subscriber::spawn(); - // also log to peerdb.log in log_dir let file_appender = tracing_appender::rolling::never(log_dir, "peerdb.log"); let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender); @@ -1145,16 +1143,9 @@ fn setup_tracing(log_dir: &str) -> TracerGuards { let fmt_stdout_layer = fmt::layer().with_target(false).with_writer(std::io::stdout); - // add min tracing as info - let filter_layer = EnvFilter::try_from_default_env() - .or_else(|_| EnvFilter::try_new("info")) - .unwrap(); - tracing_subscriber::registry() - .with(console_layer) .with(fmt_stdout_layer) .with(fmt_file_layer) - .with(filter_layer) .init(); // return guard so that the file appender is not dropped @@ -1225,55 +1216,52 @@ pub async fn main() -> anyhow::Result<()> { let (mut socket, _) = tokio::select! { _ = sigintstream.recv() => return Ok(()), v = listener.accept() => v, - } - .unwrap(); + }?; let conn_flow_handler = flow_handler.clone(); let conn_peer_conns = peer_conns.clone(); let peerdb_fdw_mode = args.peerdb_fwd_mode == "true"; let authenticator_ref = authenticator.make(); let pg_config = catalog_config.to_postgres_config(); - tokio::task::Builder::new() - .name("tcp connection handler") - .spawn(async move { - match Catalog::new(pg_config).await { - Ok(catalog) => { - let conn_uuid = uuid::Uuid::new_v4(); - let tracker = PeerConnectionTracker::new(conn_uuid, conn_peer_conns); - - let processor = Arc::new(NexusBackend::new( - Arc::new(catalog), - tracker, - conn_flow_handler, - peerdb_fdw_mode, - )); - process_socket( - socket, - None, - authenticator_ref, - processor.clone(), - processor, - ) - .await - } - Err(e) => { - tracing::error!("Failed to connect to catalog: {}", e); - - let mut buf = BytesMut::with_capacity(1024); - buf.put_u8(b'E'); - buf.put_i32(0); - buf.put(&b"FATAL"[..]); - buf.put_u8(0); - write!(buf, "Failed to connect to catalog: {e}").ok(); - buf.put_u8(0); - buf.put_u8(b'\0'); - - socket.write_all(&buf).await?; - socket.shutdown().await?; - - Ok(()) - } + tokio::task::spawn(async move { + match Catalog::new(pg_config).await { + Ok(catalog) => { + let conn_uuid = uuid::Uuid::new_v4(); + let tracker = PeerConnectionTracker::new(conn_uuid, conn_peer_conns); + + let processor = Arc::new(NexusBackend::new( + Arc::new(catalog), + tracker, + conn_flow_handler, + peerdb_fdw_mode, + )); + process_socket( + socket, + None, + authenticator_ref, + processor.clone(), + processor, + ) + .await } - })?; + Err(e) => { + tracing::error!("Failed to connect to catalog: {}", e); + + let mut buf = BytesMut::with_capacity(1024); + buf.put_u8(b'E'); + buf.put_i32(0); + buf.put(&b"FATAL"[..]); + buf.put_u8(0); + write!(buf, "Failed to connect to catalog: {e}").ok(); + buf.put_u8(0); + buf.put_u8(b'\0'); + + socket.write_all(&buf).await?; + socket.shutdown().await?; + + Ok(()) + } + } + }); } } diff --git a/protos/flow.proto b/protos/flow.proto index 4d4343f71a..c7f8e4fa25 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -404,3 +404,23 @@ message ExportTxSnapshotOutput { bool supports_tid_scans = 2; } +enum DynconfValueType { + UNKNOWN = 0; + STRING = 1; + INT = 2; + UINT = 3; + BOOL = 4; +} + +enum DynconfApplyMode { + APPLY_MODE_UNKNOWN = 0; + // should apply immediately + APPLY_MODE_IMMEDIATE = 1; + // should apply after the mirror is paused and resumed + APPLY_MODE_AFTER_RESUME = 2; + // should apply after pod is restarted + APPLY_MODE_RESTART = 3; + // only applies to newly created mirrors + APPLY_MODE_NEW_MIRROR = 4; +} + diff --git a/stacks/flow.Dockerfile b/stacks/flow.Dockerfile index 60b164b93f..945cadf6c3 100644 --- a/stacks/flow.Dockerfile +++ b/stacks/flow.Dockerfile @@ -18,7 +18,7 @@ WORKDIR /root/flow ENV CGO_ENABLED=1 RUN go build -ldflags="-s -w" -o /root/peer-flow -FROM alpine:3.19 AS flow-base +FROM alpine:3.20 AS flow-base RUN apk add --no-cache ca-certificates geos && \ adduser -s /bin/sh -D peerdb USER peerdb diff --git a/stacks/peerdb-server.Dockerfile b/stacks/peerdb-server.Dockerfile index ac85f63dce..c8acd0c5ed 100644 --- a/stacks/peerdb-server.Dockerfile +++ b/stacks/peerdb-server.Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1 -FROM lukemathwalker/cargo-chef:latest-rust-alpine3.19 as chef +FROM lukemathwalker/cargo-chef:latest-rust-alpine3.20 as chef WORKDIR /root FROM chef as planner @@ -21,7 +21,7 @@ COPY protos /root/protos WORKDIR /root/nexus RUN CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse cargo build --release --bin peerdb-server -FROM alpine:3.19 +FROM alpine:3.20 RUN apk add --no-cache ca-certificates postgresql-client curl iputils && \ adduser -s /bin/sh -D peerdb && \ install -d -m 0755 -o peerdb /var/log/peerdb diff --git a/ui/app/api/mirrors/errors/route.ts b/ui/app/api/mirrors/errors/route.ts index f56dc98b7d..b0113395d2 100644 --- a/ui/app/api/mirrors/errors/route.ts +++ b/ui/app/api/mirrors/errors/route.ts @@ -10,19 +10,27 @@ export async function POST(request: Request) { const alertReq: MirrorLogsRequest = body; const skip = (alertReq.page - 1) * alertReq.numPerPage; - const mirrorErrors: MirrorLog[] = await prisma.flow_errors.findMany({ - where: { - OR: [ - { - flow_name: { - contains: alertReq.flowJobName, + const whereClause: any = alertReq.flowJobName + ? { + OR: [ + { + flow_name: { + contains: alertReq.flowJobName, + }, }, - }, - { - flow_name: alertReq.flowJobName, - }, - ], - }, + { + flow_name: alertReq.flowJobName, + }, + ], + } + : {}; + + if (alertReq.natureOfLog && alertReq.natureOfLog !== 'ALL') { + whereClause['error_type'] = alertReq.natureOfLog.toLowerCase(); + } + + const mirrorErrors: MirrorLog[] = await prisma.flow_errors.findMany({ + where: whereClause, orderBy: { error_timestamp: 'desc', }, @@ -32,16 +40,13 @@ export async function POST(request: Request) { error_message: true, error_type: true, error_timestamp: true, - ack: true, }, take: alertReq.numPerPage, skip, }); const total = await prisma.flow_errors.count({ - where: { - flow_name: alertReq.flowJobName, - }, + where: whereClause, }); const alertRes: MirrorLogsResponse = { diff --git a/ui/app/api/mirrors/names/route.ts b/ui/app/api/mirrors/names/route.ts new file mode 100644 index 0000000000..0a9a0726a0 --- /dev/null +++ b/ui/app/api/mirrors/names/route.ts @@ -0,0 +1,22 @@ +import prisma from '@/app/utils/prisma'; +export const dynamic = 'force-dynamic'; + +export async function GET(request: Request) { + const mirrorNames = await prisma.flow_errors.findMany({ + select: { + flow_name: true, + }, + // where flow_name is not like 'clone_%' + where: { + NOT: { + flow_name: { + startsWith: 'clone_', + }, + }, + }, + distinct: ['flow_name'], + }); + return new Response( + JSON.stringify(mirrorNames.map((mirror) => mirror.flow_name)) + ); +} diff --git a/ui/app/api/settings/route.ts b/ui/app/api/settings/route.ts new file mode 100644 index 0000000000..318d5e0e3b --- /dev/null +++ b/ui/app/api/settings/route.ts @@ -0,0 +1,47 @@ +import prisma from '@/app/utils/prisma'; +import { dynamic_settings } from '@prisma/client'; + +export async function GET() { + try { + const configs: dynamic_settings[] = + await prisma.dynamic_settings.findMany(); + const serializedConfigs = configs.map((config) => ({ + ...config, + id: config.id, + })); + return new Response(JSON.stringify(serializedConfigs)); + } catch (error) { + console.error('Error fetching dynamic settings:', error); + return new Response( + JSON.stringify({ error: 'Failed to fetch dynamic settings' }), + { status: 500 } + ); + } +} + +export async function POST(request: Request) { + try { + const configReq: dynamic_settings = await request.json(); + const updateRes = await prisma.dynamic_settings.update({ + where: { + id: configReq.id, + }, + data: { + config_value: configReq.config_value, + }, + }); + + let updateStatus: 'success' | 'error' = 'error'; + if (updateRes.id) { + updateStatus = 'success'; + } + + return new Response(updateStatus); + } catch (error) { + console.error('Error updating dynamic setting:', error); + return new Response( + JSON.stringify({ error: 'Failed to update dynamic setting' }), + { status: 500 } + ); + } +} diff --git a/ui/app/dto/AlertDTO.ts b/ui/app/dto/AlertDTO.ts index ee7277f962..85b58fa694 100644 --- a/ui/app/dto/AlertDTO.ts +++ b/ui/app/dto/AlertDTO.ts @@ -6,8 +6,16 @@ export type UAlertConfigResponse = { service_config: Prisma.JsonValue; }; +export enum LogType { + ERROR = 'ERROR', + WARNING = 'WARNING', + INFO = 'INFO', + ALL = 'ALL', +} + export type MirrorLogsRequest = { flowJobName: string; + natureOfLog?: LogType; page: number; numPerPage: number; }; @@ -17,7 +25,6 @@ export type MirrorLog = { error_message: string; error_type: string; error_timestamp: Date; - ack: boolean; }; export type MirrorLogsResponse = { diff --git a/ui/app/dto/MirrorsDTO.ts b/ui/app/dto/MirrorsDTO.ts index 7bf9b0fac2..23133bc589 100644 --- a/ui/app/dto/MirrorsDTO.ts +++ b/ui/app/dto/MirrorsDTO.ts @@ -49,3 +49,10 @@ export type MirrorRowsData = { deleteCount: number; totalCount: number; }; + +export type MirrorLogsType = { + flow_name: string; + error_message: string; + error_type: string; + error_timestamp: Date; +}[]; diff --git a/ui/app/mirror-logs/layout.tsx b/ui/app/mirror-logs/layout.tsx new file mode 100644 index 0000000000..3968fee83d --- /dev/null +++ b/ui/app/mirror-logs/layout.tsx @@ -0,0 +1,11 @@ +import SidebarComponent from '@/components/SidebarComponent'; +import { Layout } from '@/lib/Layout'; +import { PropsWithChildren, Suspense } from 'react'; + +export default function PageLayout({ children }: PropsWithChildren) { + return ( + }> + {children} + + ); +} diff --git a/ui/app/mirror-logs/page.tsx b/ui/app/mirror-logs/page.tsx new file mode 100644 index 0000000000..e18c2847d0 --- /dev/null +++ b/ui/app/mirror-logs/page.tsx @@ -0,0 +1,20 @@ +import { Header } from '@/lib/Header'; +import LogsView from './table'; + +const MirrorLogs = async () => { + return ( +
+
Logs
+ +
+ ); +}; + +export default MirrorLogs; diff --git a/ui/app/mirror-logs/table.tsx b/ui/app/mirror-logs/table.tsx new file mode 100644 index 0000000000..7e6c4d6367 --- /dev/null +++ b/ui/app/mirror-logs/table.tsx @@ -0,0 +1,122 @@ +'use client'; + +import { + LogType, + MirrorLog, + MirrorLogsRequest, + MirrorLogsResponse, +} from '@/app/dto/AlertDTO'; +import LogsTable from '@/components/LogsTable'; +import { ProgressCircle } from '@/lib/ProgressCircle'; +import { useEffect, useState } from 'react'; +import ReactSelect from 'react-select'; +import 'react-toastify/dist/ReactToastify.css'; +import useSWR from 'swr'; +import { useLocalStorage } from 'usehooks-ts'; +import { fetcher } from '../utils/swr'; + +export default function LogsView() { + const [logs, setLogs] = useState([]); + const [mirrorName, setMirrorName] = useLocalStorage( + 'peerdbMirrorNameFilterForLogs', + '' + ); + const [natureOfLog, setNatureOfLog] = useLocalStorage( + 'peerdbLogTypeFilterForLogs', + LogType.ALL + ); + const [currentPage, setCurrentPage] = useState(1); + const [totalPages, setTotalPages] = useState(1); + const { data: mirrors }: { data: string[]; error: any } = useSWR( + '/api/mirrors/names', + fetcher + ); + + useEffect(() => { + setCurrentPage(1); + }, [mirrorName]); + + useEffect(() => { + const req: MirrorLogsRequest = { + natureOfLog: natureOfLog, + flowJobName: mirrorName, + page: currentPage, + numPerPage: 15, + }; + + const fetchData = async () => { + try { + const response = await fetch('/api/mirrors/errors', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + cache: 'no-store', + body: JSON.stringify(req), + }); + const data: MirrorLogsResponse = await response.json(); + const numPages = Math.ceil(data.total / req.numPerPage); + setLogs(data.errors); + setTotalPages(numPages); + } catch (error) { + console.error('Error fetching mirror logs:', error); + } + }; + + fetchData(); + }, [currentPage, mirrorName, natureOfLog]); + + if (!mirrors) { + return ; + } + return ( +
+
+
+ ({ + value: mirror, + label: mirror, + }))} + onChange={(selectedOption) => + setMirrorName(selectedOption?.value ?? '') + } + placeholder='Filter by mirror' + /> +
+
+ ({ + value: type, + label: type, + }))} + onChange={(selectedOption) => + setNatureOfLog(selectedOption?.value ?? LogType.ALL) + } + placeholder='Filter by log type' + /> +
+
+ +
+ ); +} diff --git a/ui/app/mirrors/create/cdc/fields.tsx b/ui/app/mirrors/create/cdc/fields.tsx index 3a1f8064b2..1c4bb3ace0 100644 --- a/ui/app/mirrors/create/cdc/fields.tsx +++ b/ui/app/mirrors/create/cdc/fields.tsx @@ -78,6 +78,7 @@ const CDCField = ({ getOptionValue={(option) => option.option} theme={SelectTheme} isLoading={optionsLoading} + isClearable={true} /> {setting.tips && ( diff --git a/ui/app/mirrors/create/cdc/schemabox.tsx b/ui/app/mirrors/create/cdc/schemabox.tsx index 4ec403f8ee..35ae2bf95d 100644 --- a/ui/app/mirrors/create/cdc/schemabox.tsx +++ b/ui/app/mirrors/create/cdc/schemabox.tsx @@ -1,4 +1,5 @@ 'use client'; + import { TableMapRow } from '@/app/dto/MirrorsDTO'; import { DBType } from '@/grpc_generated/peers'; import { Checkbox } from '@/lib/Checkbox'; @@ -12,12 +13,14 @@ import { Dispatch, SetStateAction, useCallback, + useEffect, useMemo, useState, } from 'react'; import { BarLoader } from 'react-spinners/'; import { fetchColumns, fetchTables } from '../handlers'; import ColumnBox from './columnbox'; +import { SchemaSettings } from './schemasettings'; import { expandableStyle, schemaBoxStyle, @@ -37,6 +40,7 @@ interface SchemaBoxProps { peerType?: DBType; omitAdditionalTables: string[] | undefined; } + const SchemaBox = ({ sourcePeer, peerType, @@ -51,6 +55,8 @@ const SchemaBox = ({ const [columnsLoading, setColumnsLoading] = useState(false); const [expandedSchemas, setExpandedSchemas] = useState([]); const [tableQuery, setTableQuery] = useState(''); + const [defaultTargetSchema, setDefaultTargetSchema] = + useState(schema); const searchedTables = useMemo(() => { const tableQueryLower = tableQuery.toLowerCase(); @@ -146,19 +152,7 @@ const SchemaBox = ({ setExpandedSchemas((curr) => [...curr, schemaName]); if (rowsDoNotHaveSchemaTables(schemaName)) { - setTablesLoading(true); - fetchTables(sourcePeer, schemaName, peerType).then((newRows) => { - for (const row of newRows) { - if (omitAdditionalTables?.includes(row.source)) { - row.canMirror = false; - } - } - setRows((oldRows) => [ - ...oldRows.filter((oldRow) => oldRow.schema !== schema), - ...newRows, - ]); - setTablesLoading(false); - }); + fetchTablesForSchema(schemaName); } } else { setExpandedSchemas((curr) => @@ -167,6 +161,34 @@ const SchemaBox = ({ } }; + const fetchTablesForSchema = useCallback( + (schemaName: string) => { + setTablesLoading(true); + fetchTables(sourcePeer, schemaName, defaultTargetSchema, peerType).then( + (newRows) => { + for (const row of newRows) { + if (omitAdditionalTables?.includes(row.source)) { + row.canMirror = false; + } + } + setRows((oldRows) => { + const filteredRows = oldRows.filter( + (oldRow) => oldRow.schema !== schemaName + ); + const updatedRows = [...filteredRows, ...newRows]; + return updatedRows; + }); + setTablesLoading(false); + } + ); + }, + [setRows, sourcePeer, defaultTargetSchema, peerType, omitAdditionalTables] + ); + + useEffect(() => { + fetchTablesForSchema(schema); + }, [schema, fetchTablesForSchema]); + return (
@@ -200,6 +222,12 @@ const SchemaBox = ({ setTableQuery(e.target.value) } /> +
+ +
{/* TABLE BOX */} @@ -273,7 +301,7 @@ const SchemaBox = ({ }} variant='simple' placeholder={'Enter target table'} - defaultValue={row.destination} + value={row.destination} onChange={(e: React.ChangeEvent) => updateDestination(row.source, e.target.value) } diff --git a/ui/app/mirrors/create/cdc/schemasettings.tsx b/ui/app/mirrors/create/cdc/schemasettings.tsx new file mode 100644 index 0000000000..3a2b1e580f --- /dev/null +++ b/ui/app/mirrors/create/cdc/schemasettings.tsx @@ -0,0 +1,85 @@ +import { Icon } from '@/lib/Icon'; +import * as Popover from '@radix-ui/react-popover'; +import { useState } from 'react'; + +export const SchemaSettings = ({ + schema, + setTargetSchemaOverride, +}: { + schema: string; + setTargetSchemaOverride: (schema: string) => void; +}) => { + const [inputValue, setInputValue] = useState(schema); + const [savedIndicator, setSavedIndicator] = useState(false); + + const handleInputChange = (e: React.ChangeEvent) => { + setInputValue(e.target.value); + }; + + const handleSave = () => { + setTargetSchemaOverride(inputValue); + setSavedIndicator(true); + setTimeout(() => setSavedIndicator(false), 3000); + }; + + return ( + + +
+ +
+
+ + + +
+

Schema On Target

+ + + {savedIndicator && ( + + success + + )} +
+
+
+
+ ); +}; diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 5df96cfb76..72ee1d5486 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -309,12 +309,19 @@ const getDefaultDestinationTable = ( peerType.toString() == 'BIGQUERY' || dBTypeToJSON(peerType) == 'BIGQUERY' ) { - return tableName; + if (schemaName.length === 0) { + return tableName; + } + return `${schemaName}_${tableName}`; } + if ( peerType.toString() == 'CLICKHOUSE' || dBTypeToJSON(peerType) == 'CLICKHOUSE' ) { + if (schemaName.length === 0) { + return tableName; + } return `${schemaName}_${tableName}`; } @@ -325,12 +332,17 @@ const getDefaultDestinationTable = ( return `.${schemaName}_${tableName}.`; } + if (schemaName.length === 0) { + return tableName; + } + return `${schemaName}.${tableName}`; }; export const fetchTables = async ( peerName: string, schemaName: string, + targetSchemaName: string, peerType?: DBType ) => { if (schemaName.length === 0) return []; @@ -351,7 +363,7 @@ export const fetchTables = async ( // for bigquery, tables are not schema-qualified const dstName = getDefaultDestinationTable( peerType!, - schemaName, + targetSchemaName, tableObject.tableName ); tables.push({ diff --git a/ui/app/mirrors/create/qrep/qrep.tsx b/ui/app/mirrors/create/qrep/qrep.tsx index 8488b5a766..ab5a0d4926 100644 --- a/ui/app/mirrors/create/qrep/qrep.tsx +++ b/ui/app/mirrors/create/qrep/qrep.tsx @@ -2,7 +2,6 @@ import SelectTheme from '@/app/styles/select'; import { RequiredIndicator } from '@/components/RequiredIndicator'; import { QRepConfig, QRepWriteType } from '@/grpc_generated/flow'; -import { DBType } from '@/grpc_generated/peers'; import { Label } from '@/lib/Label'; import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout'; import { Switch } from '@/lib/Switch'; @@ -105,17 +104,6 @@ export default function QRepConfigForm({ ) => { if (val) { if (setting.label.includes('Table')) { - if (mirrorConfig.destinationPeer?.type === DBType.BIGQUERY) { - setter((curr) => ({ - ...curr, - destinationTableIdentifier: val.split('.')[1], - })); - } else { - setter((curr) => ({ - ...curr, - destinationTableIdentifier: val, - })); - } loadColumnOptions(val); } handleChange(val, setting); diff --git a/ui/app/mirrors/errors/[mirrorName]/page.tsx b/ui/app/mirrors/errors/[mirrorName]/page.tsx index e2fad6b8b5..441f04fe0d 100644 --- a/ui/app/mirrors/errors/[mirrorName]/page.tsx +++ b/ui/app/mirrors/errors/[mirrorName]/page.tsx @@ -5,27 +5,13 @@ import { MirrorLogsRequest, MirrorLogsResponse, } from '@/app/dto/AlertDTO'; -import TimeLabel from '@/components/TimeComponent'; -import { Button } from '@/lib/Button'; -import { Icon } from '@/lib/Icon'; +import LogsTable from '@/components/LogsTable'; import { Label } from '@/lib/Label'; -import { Table, TableCell, TableRow } from '@/lib/Table'; import { useParams } from 'next/navigation'; import { useEffect, useState } from 'react'; import { ToastContainer } from 'react-toastify'; import 'react-toastify/dist/ReactToastify.css'; -const colorForErrorType = (errorType: string) => { - const errorUpper = errorType.toUpperCase(); - if (errorUpper === 'ERROR') { - return '#F45156'; - } else if (errorUpper === 'WARNING') { - return '#FFC107'; - } else { - return '#4CAF50'; - } -}; - export default function MirrorError() { const params = useParams<{ mirrorName: string }>(); const [mirrorErrors, setMirrorErrors] = useState([]); @@ -65,18 +51,6 @@ export default function MirrorError() { fetchData(); }, [currentPage, params.mirrorName]); - const handleNextPage = () => { - if (currentPage < totalPages) { - setCurrentPage(currentPage + 1); - } - }; - - const handlePrevPage = () => { - if (currentPage > 1) { - setCurrentPage(currentPage - 1); - } - }; - return ( <>
@@ -98,61 +72,12 @@ export default function MirrorError() {
- - Type - - - - Message - - - } - toolbar={{ - left: ( -
- - - - -
- ), - }} - > - {mirrorErrors.map((mirrorError, idx) => ( - - - {mirrorError.error_type.toUpperCase()} - - - - - - {mirrorError.error_message} - - - ))} -
+ diff --git a/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx b/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx index 02e0a758b6..9620cac16e 100644 --- a/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx +++ b/ui/app/mirrors/status/qrep/[mirrorId]/page.tsx @@ -33,7 +33,7 @@ export default async function QRepMirrorStatus({ startTime: run.start_time, endTime: run.end_time, pulledRows: run.rows_in_partition, - syncedRows: run.rows_synced, + syncedRows: Number(run.rows_synced), }; return ret; }); diff --git a/ui/app/settings/layout.tsx b/ui/app/settings/layout.tsx new file mode 100644 index 0000000000..69a53b44ea --- /dev/null +++ b/ui/app/settings/layout.tsx @@ -0,0 +1,7 @@ +import SidebarComponent from '@/components/SidebarComponent'; +import { Layout } from '@/lib/Layout'; +import { PropsWithChildren } from 'react'; + +export default function PageLayout({ children }: PropsWithChildren) { + return }>{children}; +} diff --git a/ui/app/settings/page.tsx b/ui/app/settings/page.tsx new file mode 100644 index 0000000000..e84ddd31f2 --- /dev/null +++ b/ui/app/settings/page.tsx @@ -0,0 +1,281 @@ +'use client'; + +import { DynconfApplyMode, DynconfValueType } from '@/grpc_generated/flow'; +import { Button } from '@/lib/Button'; +import { Icon } from '@/lib/Icon'; +import { Label } from '@/lib/Label'; +import { SearchField } from '@/lib/SearchField'; +import { Table, TableCell, TableRow } from '@/lib/Table'; +import { TextField } from '@/lib/TextField'; +import { Tooltip } from '@/lib/Tooltip'; +import { dynamic_settings } from '@prisma/client'; +import { MaterialSymbol } from 'material-symbols'; +import { useEffect, useMemo, useState } from 'react'; +import { ToastContainer } from 'react-toastify'; +import 'react-toastify/dist/ReactToastify.css'; +import { notifyErr } from '../utils/notify'; + +const ROWS_PER_PAGE = 7; + +const ApplyModeIconWithTooltip = ({ applyMode }: { applyMode: number }) => { + let tooltipText = ''; + let iconName: MaterialSymbol = 'help'; + + switch (applyMode) { + case DynconfApplyMode.APPLY_MODE_IMMEDIATE: + tooltipText = 'Changes to this configuration will apply immediately'; + iconName = 'bolt'; + break; + case DynconfApplyMode.APPLY_MODE_AFTER_RESUME: + tooltipText = 'Changes to this configuration will apply after resume'; + iconName = 'cached'; + break; + case DynconfApplyMode.APPLY_MODE_RESTART: + tooltipText = + 'Changes to this configuration will apply after server restart.'; + iconName = 'restart_alt'; + break; + case DynconfApplyMode.APPLY_MODE_NEW_MIRROR: + tooltipText = + 'Changes to this configuration will apply only to new mirrors'; + iconName = 'new_window'; + break; + default: + tooltipText = 'Unknown apply mode'; + iconName = 'help'; + } + + return ( +
+ + + +
+ ); +}; + +const DynamicSettingItem = ({ + setting, + onSettingUpdate, +}: { + setting: dynamic_settings; + onSettingUpdate: () => void; +}) => { + const [editMode, setEditMode] = useState(false); + const [newValue, setNewValue] = useState(setting.config_value); + + const handleEdit = () => { + setEditMode(true); + }; + + const validateNewValue = (): boolean => { + const notNullValue = newValue ?? ''; + if (setting.config_value_type === DynconfValueType.INT) { + const a = parseInt(Number(notNullValue).toString()); + if ( + isNaN(a) || + a > Number.MAX_SAFE_INTEGER || + a < Number.MIN_SAFE_INTEGER + ) { + notifyErr('Invalid value. Please enter a valid 64-bit signed integer.'); + return false; + } + return true; + } else if (setting.config_value_type === DynconfValueType.UINT) { + const a = parseInt(Number(notNullValue).toString()); + if (isNaN(a) || a > Number.MAX_SAFE_INTEGER || a < 0) { + notifyErr( + 'Invalid value. Please enter a valid 64-bit unsigned integer.' + ); + return false; + } + return true; + } else if (setting.config_value_type === DynconfValueType.BOOL) { + if (notNullValue !== 'true' && notNullValue !== 'false') { + notifyErr('Invalid value. Please enter true or false.'); + return false; + } + return true; + } else if (setting.config_value_type === DynconfValueType.STRING) { + return true; + } else { + notifyErr('Invalid value type'); + return false; + } + }; + + const handleSave = async () => { + if (!validateNewValue() || newValue === setting.config_value) { + setNewValue(setting.config_value); + setEditMode(false); + return; + } + const updatedSetting = { ...setting, config_value: newValue }; + await fetch('/api/settings', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(updatedSetting), + }); + setEditMode(false); + onSettingUpdate(); + }; + + return ( + + + + + + {editMode ? ( +
+ setNewValue(e.target.value)} + variant='simple' + /> + +
+ ) : ( +
+ {setting.config_value || 'N/A'} + +
+ )} +
+ + {setting.config_default_value || 'N/A'} + + + {setting.config_description || 'N/A'} + + + + +
+ ); +}; + +const SettingsPage = () => { + const [settings, setSettings] = useState([]); + const [currentPage, setCurrentPage] = useState(1); + const [searchQuery, setSearchQuery] = useState(''); + const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('asc'); + const sortField = 'config_name'; + + const fetchSettings = async () => { + const response = await fetch('/api/settings'); + const data = await response.json(); + setSettings(data); + }; + + useEffect(() => { + fetchSettings(); + }, []); + + const filteredSettings = useMemo(() => { + return settings.filter((setting) => + setting.config_name.toLowerCase().includes(searchQuery.toLowerCase()) + ); + }, [settings, searchQuery]); + const totalPages = Math.ceil(filteredSettings.length / ROWS_PER_PAGE); + const displayedSettings = useMemo(() => { + filteredSettings.sort((a, b) => { + const aValue = a[sortField]; + const bValue = b[sortField]; + if (aValue === null || bValue === null) return 0; + if (aValue < bValue) return sortDir === 'dsc' ? 1 : -1; + if (aValue > bValue) return sortDir === 'dsc' ? -1 : 1; + return 0; + }); + + const startRow = (currentPage - 1) * ROWS_PER_PAGE; + const endRow = startRow + ROWS_PER_PAGE; + return filteredSettings.slice(startRow, endRow); + }, [filteredSettings, currentPage, sortDir]); + + const handlePrevPage = () => { + if (currentPage > 1) setCurrentPage(currentPage - 1); + }; + + const handleNextPage = () => { + if (currentPage < totalPages) setCurrentPage(currentPage + 1); + }; + + return ( +
+ Settings List} + toolbar={{ + left: ( +
+ + + + + + +
+ ), + right: ( + setSearchQuery(e.target.value)} + /> + ), + }} + header={ + + {[ + { header: 'Configuration Name', width: '35%' }, + { header: 'Current Value', width: '10%' }, + { header: 'Default Value', width: '10%' }, + { header: 'Description', width: '35%' }, + { header: 'Apply Mode', width: '10%' }, + ].map(({ header, width }) => ( + + {header} + + ))} + + } + > + {displayedSettings.map((setting) => ( + + ))} +
+ +
+ ); +}; + +export default SettingsPage; diff --git a/ui/components/LogsTable.tsx b/ui/components/LogsTable.tsx new file mode 100644 index 0000000000..3b65f89f28 --- /dev/null +++ b/ui/components/LogsTable.tsx @@ -0,0 +1,115 @@ +import { MirrorLog } from '@/app/dto/AlertDTO'; +import TimeLabel from '@/components/TimeComponent'; +import { Button } from '@/lib/Button'; +import { Icon } from '@/lib/Icon'; +import { Label } from '@/lib/Label'; +import { Table, TableCell, TableRow } from '@/lib/Table'; +import 'react-toastify/dist/ReactToastify.css'; + +const colorForErrorType = (errorType: string) => { + const errorUpper = errorType.toUpperCase(); + if (errorUpper === 'ERROR') { + return '#F45156'; + } else if (errorUpper === 'WARNING') { + return '#FFC107'; + } else { + return '#4CAF50'; + } +}; + +const extractFromCloneName = (mirrorOrCloneName: string) => { + if (mirrorOrCloneName.includes('clone_')) { + return mirrorOrCloneName.split('_')[1] + ' (initial load)'; + } + return mirrorOrCloneName; +}; + +const LogsTable = ({ + logs, + currentPage, + totalPages, + setCurrentPage, +}: { + logs: MirrorLog[]; + currentPage: number; + totalPages: number; + setCurrentPage: (page: number) => void; +}) => { + const handleNextPage = () => { + if (currentPage < totalPages) { + setCurrentPage(currentPage + 1); + } + }; + const handlePrevPage = () => { + if (currentPage > 1) { + setCurrentPage(currentPage - 1); + } + }; + + return ( + + Type + + + + Mirror + Message + + + } + toolbar={{ + left: ( +
+ + + + +
+ ), + }} + > + {logs.map((log, idx) => ( + + + {log.error_type.toUpperCase()} + + + + + + + + + {log.error_message} + + + ))} +
+ ); +}; + +export default LogsTable; diff --git a/ui/components/SidebarComponent.tsx b/ui/components/SidebarComponent.tsx index 3985c7dcaa..5be2e128f6 100644 --- a/ui/components/SidebarComponent.tsx +++ b/ui/components/SidebarComponent.tsx @@ -150,6 +150,20 @@ export default function SidebarComponent() { > {sidebarState === 'open' && 'Scripts'} + } + > + {sidebarState === 'open' && 'Logs'} + + } + > + {sidebarState === 'open' && 'Settings'} + ); diff --git a/ui/lib/Table/TableCell.styles.ts b/ui/lib/Table/TableCell.styles.ts index 5a4eba2d37..c9c1e257f1 100644 --- a/ui/lib/Table/TableCell.styles.ts +++ b/ui/lib/Table/TableCell.styles.ts @@ -11,6 +11,11 @@ const variants = { min-width: ${({ theme }) => theme.size.xxSmall}; max-width: ${({ theme }) => theme.size.xSmall}; `, + mirror_name: css` + overflow: auto; + min-width: ${({ theme }) => theme.size.small}; + max-width: ${({ theme }) => theme.size.medium}; + `, }; export type TableCellVariant = keyof typeof variants; @@ -18,7 +23,6 @@ type BaseTableCellProps = { $variant: TableCellVariant; }; export const BaseTableCell = styled.td` - border-collapse: collapse; overflow: hidden; ${({ $variant }) => variants[$variant]} `; diff --git a/ui/prisma/schema.prisma b/ui/prisma/schema.prisma index 4e3309aa80..f792406ac8 100644 --- a/ui/prisma/schema.prisma +++ b/ui/prisma/schema.prisma @@ -122,7 +122,7 @@ model qrep_partitions { restart_count Int metadata Json? id Int @id @default(autoincrement()) - rows_synced Int? + rows_synced BigInt? qrep_runs qrep_runs @relation(fields: [flow_name, run_uuid], references: [flow_name, run_uuid], onDelete: Cascade, onUpdate: NoAction, map: "fk_qrep_partitions_run") @@unique([run_uuid, partition_uuid]) @@ -213,20 +213,14 @@ model schema_deltas_audit_log { @@schema("peerdb_stats") } -model alerting_settings { - id Int @id(map: "alerting_settings_pkey1") @default(autoincrement()) - config_name String - config_value String - - @@schema("public") -} - model dynamic_settings { - id Int @id(map: "alerting_settings_pkey") @default(autoincrement()) - config_name String @unique(map: "idx_alerting_settings_config_name") - config_value String - setting_description String? - needs_restart Boolean? + id Int @id(map: "alerting_settings_pkey") @default(autoincrement()) + config_name String @unique(map: "idx_alerting_settings_config_name") + config_value String? + config_default_value String? + config_value_type Int? + config_description String? + config_apply_mode Int? @@schema("public") } @@ -261,6 +255,19 @@ model scripts { @@schema("public") } +model ch_s3_stage { + id Int @id @default(autoincrement()) + flow_job_name String + sync_batch_id BigInt + avro_file Json + created_at DateTime @default(now()) @db.Timestamptz(6) + + @@unique([flow_job_name, sync_batch_id]) + @@index([flow_job_name]) + @@index([sync_batch_id]) + @@schema("public") +} + enum script_lang { lua diff --git a/ui/tsconfig.json b/ui/tsconfig.json index c443fefcce..46009ed0ae 100644 --- a/ui/tsconfig.json +++ b/ui/tsconfig.json @@ -1,6 +1,6 @@ { "compilerOptions": { - "target": "es6", + "target": "ES2020", "lib": ["dom", "dom.iterable", "esnext"], "allowJs": true, "skipLibCheck": true,