diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 303066f119..52c6d705f2 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -47,12 +47,12 @@ jobs: # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@ea9e4e37992a54ee68a9622e985e60c8e8f12d9f # v3 + uses: github/codeql-action/init@f09c1c0a94de965c15400f5634aa42fac8fb8f88 # v3 with: languages: ${{ matrix.language }} build-mode: ${{ matrix.build-mode }} - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@ea9e4e37992a54ee68a9622e985e60c8e8f12d9f # v3 + uses: github/codeql-action/analyze@f09c1c0a94de965c15400f5634aa42fac8fb8f88 # v3 with: category: "/language:${{matrix.language}}" diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index e794512a04..b7c2256143 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -25,7 +25,7 @@ jobs: POSTGRES_DB: postgres POSTGRES_INITDB_ARGS: --locale=C.UTF-8 elasticsearch: - image: elasticsearch:8.16.0@sha256:a411f7c17549209c5839b69f929de00bd91f1e2dcf08b65d5f41b122eae17f5e + image: elasticsearch:8.16.1@sha256:e5ee5f8dacbf18fa3ab59a098cc7d4d69f73e61637eb45f1c029e74b1cb200a1 ports: - 9200:9200 env: diff --git a/README.md b/README.md index ebd4579f19..63aa2fe271 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,14 @@ PeerDB is an ETL/ELT tool built for PostgreSQL. We implement multiple Postgres n **From a feature richness standpoint**, we support efficient syncing of tables with large (TOAST) columns. We support multiple streaming modes - Log based (CDC) based, Query based streaming etc. We provide rich data-type mapping and plan to support every possible (incl. Custom types) that Postgres supports to the best extent possible on the target data-store. +### Now available natively in ClickHouse Cloud (Private Preview) + +PeerDB is now available natively in ClickHouse Cloud (Private Preview). Learn more about it [here](https://clickhouse.com/cloud/clickpipes/postgres-cdc-connector). + + + + + #### **Postgres-compatible SQL interface to do ETL** The Postgres-compatible SQL interface for ETL is unique to PeerDB and enables you to operate in a language you are familiar with. You can do ETL the same way you work with your databases. diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 98ee987b36..6459c0b131 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -39,7 +39,7 @@ x-flow-worker-env: &flow-worker-env services: catalog: container_name: catalog - image: postgres:17-alpine@sha256:0d9624535618a135c5453258fd629f4963390338b11aaffb92292c12df3a6c17 + image: postgres:17-alpine@sha256:e7897baa70dae1968d23d785adb4aeb699175e0bcaae44f98a7083ecb9668b93 command: -c config_file=/etc/postgresql.conf ports: - 9901:5432 diff --git a/docker-compose.yml b/docker-compose.yml index 4d714e9973..c7991b9e22 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -32,7 +32,7 @@ x-flow-worker-env: &flow-worker-env services: catalog: container_name: catalog - image: postgres:17-alpine@sha256:0d9624535618a135c5453258fd629f4963390338b11aaffb92292c12df3a6c17 + image: postgres:17-alpine@sha256:e7897baa70dae1968d23d785adb4aeb699175e0bcaae44f98a7083ecb9668b93 command: -c config_file=/etc/postgresql.conf restart: unless-stopped ports: @@ -112,7 +112,7 @@ services: flow-api: container_name: flow_api - image: ghcr.io/peerdb-io/flow-api:latest-stable@sha256:53a6de3d7537b4a90b4ff13d822d0a9fa3015857fc739fc2497d33f33b05dfaa + image: ghcr.io/peerdb-io/flow-api:stable-v0.19.1@sha256:a759b2d1b14f11d09ade672c268abcb456fd8884468547ea0f467cdfb60a0994 restart: unless-stopped ports: - 8112:8112 @@ -128,7 +128,7 @@ services: flow-snapshot-worker: container_name: flow-snapshot-worker - image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-stable@sha256:036d6091e32c9d15f2738bc6aab312aa1f412f5c06c57687b497cde233b73d4c + image: ghcr.io/peerdb-io/flow-snapshot-worker:stable-v0.19.1@sha256:894c1fea1cf9a4f5622420d8630509243b60cf177e107ec4d14d7294a9490451 restart: unless-stopped environment: <<: [*catalog-config, *flow-worker-env, *minio-config] @@ -138,7 +138,7 @@ services: flow-worker: container_name: flow-worker - image: ghcr.io/peerdb-io/flow-worker:latest-stable@sha256:f5d4d5e4e44336d6917e3c8b3d753c77d813d5d1e55ca7fb4d3a3d3d1d3253cc + image: ghcr.io/peerdb-io/flow-worker:stable-v0.19.1@sha256:4482314bd3bd4a96930fbee10c00a9f2d5764e86cfd8802642589d339cf04054 restart: unless-stopped environment: <<: [*catalog-config, *flow-worker-env, *minio-config] @@ -151,7 +151,7 @@ services: peerdb: container_name: peerdb-server stop_signal: SIGINT - image: ghcr.io/peerdb-io/peerdb-server:latest-stable@sha256:15249fc45b8b5384fb7a046bc73f75cc679c570a3d2fd3fd8c40c7d7e85f7eef + image: ghcr.io/peerdb-io/peerdb-server:stable-v0.19.1@sha256:c736500e0b42f100df29af43ecf4c96d0c8f4805dd294fecd0bb4ce7b7897a18 restart: unless-stopped environment: <<: *catalog-config @@ -167,7 +167,7 @@ services: peerdb-ui: container_name: peerdb-ui - image: ghcr.io/peerdb-io/peerdb-ui:latest-stable@sha256:f4d1cdf966eb06f4a4a03db4b02593b44c8a37bd32143c937d3c59c2586c4bb1 + image: ghcr.io/peerdb-io/peerdb-ui:stable-v0.19.1@sha256:ffc4b5960dc1653a59e680c61fca0ba2c5891cb4965e4662927d9886f4d7f6bc restart: unless-stopped ports: - 3000:3000 diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index cc09bae0d7..ed9d2b720d 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -28,7 +28,6 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/otel_metrics" - "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/pua" "github.com/PeerDB-io/peer-flow/shared" @@ -287,11 +286,13 @@ func (a *FlowableActivity) MaintainPull( ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) srcConn, err := connectors.GetByNameAs[connectors.CDCPullConnector](ctx, config.Env, a.CatalogPool, config.SourceName) if err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return err } defer connectors.CloseConnector(ctx, srcConn) if err := srcConn.SetupReplConn(ctx); err != nil { + a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return err } @@ -407,7 +408,7 @@ func (a *FlowableActivity) StartNormalize( if errors.Is(err, errors.ErrUnsupported) { return nil, monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID) } else if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get normalize connector: %w", err) } defer connectors.CloseConnector(ctx, dstConn) @@ -418,7 +419,7 @@ func (a *FlowableActivity) StartNormalize( tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, input.FlowConnectionConfigs.FlowJobName) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get table name schema mapping: %w", err) } res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{ @@ -436,13 +437,13 @@ func (a *FlowableActivity) StartNormalize( } dstType, err := connectors.LoadPeerType(ctx, a.CatalogPool, input.FlowConnectionConfigs.DestinationName) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get peer type: %w", err) } if dstType == protos.DBType_POSTGRES { err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to update end time for cdc batch: %w", err) } } @@ -757,11 +758,10 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { return } - slotMetricGauges := peerdb_gauges.SlotMetricGauges{} + slotMetricGauges := otel_metrics.SlotMetricGauges{} if a.OtelManager != nil { - slotLagGauge, err := otel_metrics.GetOrInitFloat64SyncGauge(a.OtelManager.Meter, - a.OtelManager.Float64GaugesCache, - peerdb_gauges.BuildGaugeName(peerdb_gauges.SlotLagGaugeName), + slotLagGauge, err := a.OtelManager.GetOrInitFloat64Gauge( + otel_metrics.BuildMetricName(otel_metrics.SlotLagGaugeName), metric.WithUnit("MiBy"), metric.WithDescription("Postgres replication slot lag in MB")) if err != nil { @@ -770,9 +770,8 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { } slotMetricGauges.SlotLagGauge = slotLagGauge - openConnectionsGauge, err := otel_metrics.GetOrInitInt64SyncGauge(a.OtelManager.Meter, - a.OtelManager.Int64GaugesCache, - peerdb_gauges.BuildGaugeName(peerdb_gauges.OpenConnectionsGaugeName), + openConnectionsGauge, err := a.OtelManager.GetOrInitInt64Gauge( + otel_metrics.BuildMetricName(otel_metrics.OpenConnectionsGaugeName), metric.WithDescription("Current open connections for PeerDB user")) if err != nil { logger.Error("Failed to get open connections gauge", slog.Any("error", err)) @@ -780,9 +779,8 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { } slotMetricGauges.OpenConnectionsGauge = openConnectionsGauge - openReplicationConnectionsGauge, err := otel_metrics.GetOrInitInt64SyncGauge(a.OtelManager.Meter, - a.OtelManager.Int64GaugesCache, - peerdb_gauges.BuildGaugeName(peerdb_gauges.OpenReplicationConnectionsGaugeName), + openReplicationConnectionsGauge, err := a.OtelManager.GetOrInitInt64Gauge( + otel_metrics.BuildMetricName(otel_metrics.OpenReplicationConnectionsGaugeName), metric.WithDescription("Current open replication connections for PeerDB user")) if err != nil { logger.Error("Failed to get open replication connections gauge", slog.Any("error", err)) @@ -790,9 +788,8 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error { } slotMetricGauges.OpenReplicationConnectionsGauge = openReplicationConnectionsGauge - intervalSinceLastNormalizeGauge, err := otel_metrics.GetOrInitFloat64SyncGauge(a.OtelManager.Meter, - a.OtelManager.Float64GaugesCache, - peerdb_gauges.BuildGaugeName(peerdb_gauges.IntervalSinceLastNormalizeGaugeName), + intervalSinceLastNormalizeGauge, err := a.OtelManager.GetOrInitFloat64Gauge( + otel_metrics.BuildMetricName(otel_metrics.IntervalSinceLastNormalizeGaugeName), metric.WithUnit("s"), metric.WithDescription("Interval since last normalize")) if err != nil { diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index d583044b0c..2d1f7e1f3e 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -23,6 +23,7 @@ import ( "github.com/PeerDB-io/peer-flow/connectors/utils/monitoring" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/otel_metrics" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -113,7 +114,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon options *protos.SyncFlowOptions, sessionID string, adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error), - pull func(TPull, context.Context, *pgxpool.Pool, *model.PullRecordsRequest[Items]) error, + pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error, sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error), ) (*model.SyncCompositeResponse, error) { flowName := config.FlowJobName @@ -181,7 +182,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon startTime := time.Now() errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { - return pull(srcConn, errCtx, a.CatalogPool, &model.PullRecordsRequest[Items]{ + return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{ FlowJobName: flowName, SrcTableIDNameMapping: options.SrcTableIdNameMapping, TableNameMapping: tblNameMapping, diff --git a/flow/alerting/alerting.go b/flow/alerting/alerting.go index 5f05005d14..d1394561fd 100644 --- a/flow/alerting/alerting.go +++ b/flow/alerting/alerting.go @@ -21,6 +21,7 @@ import ( "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" "github.com/PeerDB-io/peer-flow/shared/telemetry" + "github.com/PeerDB-io/peer-flow/tags" ) // alerting service, no cool name :( @@ -356,7 +357,7 @@ func (a *Alerter) checkAndAddAlertToCatalog(ctx context.Context, alertConfigId i return true } - logger.Info(fmt.Sprintf("Skipped sending alerts: last alert was sent at %s, which was >=%s ago", createdTimestamp.String(), dur.String())) + logger.Info(fmt.Sprintf("Skipped sending alerts: last alert was sent at %s, which was <=%s ago", createdTimestamp.String(), dur.String())) return false } @@ -366,13 +367,24 @@ func (a *Alerter) sendTelemetryMessage( flowName string, more string, level telemetry.Level, - tags ...string, + additionalTags ...string, ) { + allTags := []string{flowName, peerdbenv.PeerDBDeploymentUID()} + allTags = append(allTags, additionalTags...) + + if flowTags, err := tags.GetTags(ctx, a.CatalogPool, flowName); err != nil { + logger.Warn("failed to get flow tags", slog.Any("error", err)) + } else { + for key, value := range flowTags { + allTags = append(allTags, fmt.Sprintf("%s:%s", key, value)) + } + } + details := fmt.Sprintf("[%s] %s", flowName, more) attributes := telemetry.Attributes{ Level: level, DeploymentUID: peerdbenv.PeerDBDeploymentUID(), - Tags: append([]string{flowName, peerdbenv.PeerDBDeploymentUID()}, tags...), + Tags: allTags, Type: flowName, } @@ -440,6 +452,10 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error) if errors.As(err, &pgErr) { tags = append(tags, "pgcode:"+pgErr.Code) } + var netErr *net.OpError + if errors.As(err, &netErr) { + tags = append(tags, "err:Net") + } a.sendTelemetryMessage(ctx, logger, flowName, errorWithStack, telemetry.ERROR, tags...) } diff --git a/flow/cmd/tags_handler.go b/flow/cmd/tags_handler.go new file mode 100644 index 0000000000..ddd362c3e4 --- /dev/null +++ b/flow/cmd/tags_handler.go @@ -0,0 +1,84 @@ +package cmd + +import ( + "context" + "fmt" + "log/slog" + + "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/tags" +) + +func (h *FlowRequestHandler) flowExists(ctx context.Context, flowName string) (bool, error) { + var exists bool + err := h.pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM flows WHERE name = $1)", flowName).Scan(&exists) + if err != nil { + slog.Error("error checking if flow exists", slog.Any("error", err)) + return false, err + } + + slog.Info(fmt.Sprintf("flow %s exists: %t", flowName, exists)) + return exists, nil +} + +func (h *FlowRequestHandler) CreateOrReplaceFlowTags( + ctx context.Context, + in *protos.CreateOrReplaceFlowTagsRequest, +) (*protos.CreateOrReplaceFlowTagsResponse, error) { + flowName := in.FlowName + + exists, err := h.flowExists(ctx, flowName) + if err != nil { + return nil, err + } + + if !exists { + slog.Error("flow does not exist", slog.String("flow_name", flowName)) + return nil, fmt.Errorf("flow %s does not exist", flowName) + } + + tags := make(map[string]string, len(in.Tags)) + for _, tag := range in.Tags { + tags[tag.Key] = tag.Value + } + + _, err = h.pool.Exec(ctx, "UPDATE flows SET tags = $1 WHERE name = $2", tags, flowName) + if err != nil { + slog.Error("error updating flow tags", slog.Any("error", err)) + return nil, err + } + + return &protos.CreateOrReplaceFlowTagsResponse{ + FlowName: flowName, + }, nil +} + +func (h *FlowRequestHandler) GetFlowTags(ctx context.Context, in *protos.GetFlowTagsRequest) (*protos.GetFlowTagsResponse, error) { + flowName := in.FlowName + + exists, err := h.flowExists(ctx, flowName) + if err != nil { + return nil, err + } + + if !exists { + slog.Error("flow does not exist", slog.String("flow_name", flowName)) + return nil, fmt.Errorf("flow %s does not exist", flowName) + } + + tags, err := tags.GetTags(ctx, h.pool, flowName) + if err != nil { + slog.Error("error getting flow tags", slog.Any("error", err)) + return nil, err + } + + protosTags := make([]*protos.FlowTag, 0, len(tags)) + for key, value := range tags { + protosTags = append(protosTags, &protos.FlowTag{Key: key, Value: value}) + } + + return &protos.GetFlowTagsResponse{ + FlowName: flowName, + Tags: protosTags, + }, nil +} diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index cca0202ec7..87fbd0aa54 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -10,7 +10,6 @@ import ( "runtime" "github.com/grafana/pyroscope-go" - "go.opentelemetry.io/otel/metric" "go.temporal.io/sdk/client" temporalotel "go.temporal.io/sdk/contrib/opentelemetry" "go.temporal.io/sdk/worker" @@ -35,9 +34,18 @@ type WorkerSetupOptions struct { } type workerSetupResponse struct { - Client client.Client - Worker worker.Worker - Cleanup func() + Client client.Client + Worker worker.Worker + OtelManager *otel_metrics.OtelManager +} + +func (w *workerSetupResponse) Close() { + w.Client.Close() + if w.OtelManager != nil { + if err := w.OtelManager.Close(context.Background()); err != nil { + slog.Error("Failed to shutdown metrics provider", slog.Any("error", err)) + } + } } func setupPyroscope(opts *WorkerSetupOptions) { @@ -148,26 +156,14 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { }) peerflow.RegisterFlowWorkerWorkflows(w) - cleanupOtelManagerFunc := func() {} var otelManager *otel_metrics.OtelManager if opts.EnableOtelMetrics { - metricsProvider, metricsErr := otel_metrics.SetupPeerDBMetricsProvider("flow-worker") - if metricsErr != nil { - return nil, metricsErr - } - otelManager = &otel_metrics.OtelManager{ - MetricsProvider: metricsProvider, - Meter: metricsProvider.Meter("io.peerdb.flow-worker"), - Float64GaugesCache: make(map[string]metric.Float64Gauge), - Int64GaugesCache: make(map[string]metric.Int64Gauge), - } - cleanupOtelManagerFunc = func() { - shutDownErr := otelManager.MetricsProvider.Shutdown(context.Background()) - if shutDownErr != nil { - slog.Error("Failed to shutdown metrics provider", slog.Any("error", shutDownErr)) - } + otelManager, err = otel_metrics.NewOtelManager() + if err != nil { + return nil, fmt.Errorf("unable to create otel manager: %w", err) } } + w.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, Alerter: alerting.NewAlerter(context.Background(), conn), @@ -182,11 +178,8 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { }) return &workerSetupResponse{ - Client: c, - Worker: w, - Cleanup: func() { - cleanupOtelManagerFunc() - c.Close() - }, + Client: c, + Worker: w, + OtelManager: otelManager, }, nil } diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index 63ccea6937..f024a767e4 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -228,11 +228,12 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou tlsSetting.RootCAs = caPool } - var settings clickhouse.Settings + // See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency + settings := clickhouse.Settings{"select_sequential_consistency": uint64(1)} if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil { return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err) } else if maxInsertThreads != 0 { - settings = clickhouse.Settings{"max_insert_threads": maxInsertThreads} + settings["max_insert_threads"] = maxInsertThreads } conn, err := clickhouse.Open(&clickhouse.Options{ diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index fb221096c0..2debe0f4d5 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -474,7 +474,10 @@ func (c *ClickHouseConnector) NormalizeRecords( case queries <- insertIntoSelectQuery.String(): case <-errCtx.Done(): close(queries) - return nil, ctx.Err() + c.logger.Error("[clickhouse] context canceled while normalizing", + slog.Any("error", errCtx.Err()), + slog.Any("cause", context.Cause(errCtx))) + return nil, context.Cause(errCtx) } } close(queries) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 8a6bbbc0e2..073d9d82b4 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -23,7 +23,7 @@ import ( connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" - "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges" + "github.com/PeerDB-io/peer-flow/otel_metrics" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -85,7 +85,7 @@ type CDCPullConnectorCore interface { alerter *alerting.Alerter, catalogPool *pgxpool.Pool, alertKeys *alerting.AlertKeys, - slotMetricGauges peerdb_gauges.SlotMetricGauges, + slotMetricGauges otel_metrics.SlotMetricGauges, ) error // GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector. @@ -102,7 +102,12 @@ type CDCPullConnector interface { CDCPullConnectorCore // This method should be idempotent, and should be able to be called multiple times with the same request. - PullRecords(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest[model.RecordItems]) error + PullRecords( + ctx context.Context, + catalogPool *pgxpool.Pool, + otelManager *otel_metrics.OtelManager, + req *model.PullRecordsRequest[model.RecordItems], + ) error } type CDCPullPgConnector interface { @@ -110,7 +115,12 @@ type CDCPullPgConnector interface { // This method should be idempotent, and should be able to be called multiple times with the same request. // It's signature, aside from type parameter, should match CDCPullConnector.PullRecords. - PullPg(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest[model.PgItems]) error + PullPg( + ctx context.Context, + catalogPool *pgxpool.Pool, + otelManager *otel_metrics.OtelManager, + req *model.PullRecordsRequest[model.PgItems], + ) error } type NormalizedTablesConnector interface { diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 9f4bd0b966..27ae89904c 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -14,6 +14,8 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "github.com/lib/pq/oid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.temporal.io/sdk/activity" connmetadata "github.com/PeerDB-io/peer-flow/connectors/external_metadata" @@ -22,6 +24,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" + "github.com/PeerDB-io/peer-flow/otel_metrics" "github.com/PeerDB-io/peer-flow/shared" ) @@ -41,12 +44,14 @@ type PostgresCDCSource struct { // for storing schema delta audit logs to catalog catalogPool *pgxpool.Pool + otelManager *otel_metrics.OtelManager hushWarnUnhandledMessageType map[pglogrepl.MessageType]struct{} flowJobName string } type PostgresCDCConfig struct { CatalogPool *pgxpool.Pool + OtelManager *otel_metrics.OtelManager SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude TableNameSchemaMapping map[string]*protos.TableSchema @@ -71,6 +76,7 @@ func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) * commitLock: nil, childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, catalogPool: cdcConfig.CatalogPool, + otelManager: cdcConfig.OtelManager, flowJobName: cdcConfig.FlowJobName, hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}), } @@ -331,8 +337,7 @@ func PullCdcRecords[Items model.Items]( records.SignalAsEmpty() } logger.Info(fmt.Sprintf("[finished] PullRecords streamed %d records", cdcRecordsStorage.Len())) - err := cdcRecordsStorage.Close() - if err != nil { + if err := cdcRecordsStorage.Close(); err != nil { logger.Warn("failed to clean up records storage", slog.Any("error", err)) } }() @@ -361,6 +366,16 @@ func PullCdcRecords[Items model.Items]( return nil } + var fetchedBytesCounter metric.Int64Counter + if p.otelManager != nil { + var err error + fetchedBytesCounter, err = p.otelManager.GetOrInitInt64Counter(otel_metrics.BuildMetricName(otel_metrics.FetchedBytesCounterName), + metric.WithUnit("By"), metric.WithDescription("Bytes received of CopyData over replication slot")) + if err != nil { + return fmt.Errorf("could not get FetchedBytesCounter: %w", err) + } + } + pkmRequiresResponse := false waitingForCommit := false @@ -439,8 +454,7 @@ func PullCdcRecords[Items model.Items]( }() cancel() - ctxErr := ctx.Err() - if ctxErr != nil { + if ctxErr := ctx.Err(); ctxErr != nil { return fmt.Errorf("consumeStream preempted: %w", ctxErr) } @@ -463,6 +477,12 @@ func PullCdcRecords[Items model.Items]( continue } + if fetchedBytesCounter != nil { + fetchedBytesCounter.Add(ctx, int64(len(msg.Data)), metric.WithAttributeSet(attribute.NewSet( + attribute.String(otel_metrics.FlowNameKey, req.FlowJobName), + ))) + } + switch msg.Data[0] { case pglogrepl.PrimaryKeepaliveMessageByteID: pkm, err := pglogrepl.ParsePrimaryKeepaliveMessage(msg.Data[1:]) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index ae0dbea52d..14b827cc89 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -28,7 +28,6 @@ import ( "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/otel_metrics" - "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -330,17 +329,19 @@ func (c *PostgresConnector) SetLastOffset(ctx context.Context, jobName string, l func (c *PostgresConnector) PullRecords( ctx context.Context, catalogPool *pgxpool.Pool, + otelManager *otel_metrics.OtelManager, req *model.PullRecordsRequest[model.RecordItems], ) error { - return pullCore(ctx, c, catalogPool, req, qProcessor{}) + return pullCore(ctx, c, catalogPool, otelManager, req, qProcessor{}) } func (c *PostgresConnector) PullPg( ctx context.Context, catalogPool *pgxpool.Pool, + otelManager *otel_metrics.OtelManager, req *model.PullRecordsRequest[model.PgItems], ) error { - return pullCore(ctx, c, catalogPool, req, pgProcessor{}) + return pullCore(ctx, c, catalogPool, otelManager, req, pgProcessor{}) } // PullRecords pulls records from the source. @@ -348,6 +349,7 @@ func pullCore[Items model.Items]( ctx context.Context, c *PostgresConnector, catalogPool *pgxpool.Pool, + otelManager *otel_metrics.OtelManager, req *model.PullRecordsRequest[Items], processor replProcessor[Items], ) error { @@ -414,6 +416,7 @@ func pullCore[Items model.Items]( cdc := c.NewPostgresCDCSource(&PostgresCDCConfig{ CatalogPool: catalogPool, + OtelManager: otelManager, SrcTableIDNameMapping: req.SrcTableIDNameMapping, TableNameMapping: req.TableNameMapping, TableNameSchemaMapping: req.TableNameSchemaMapping, @@ -435,8 +438,7 @@ func pullCore[Items model.Items]( return fmt.Errorf("failed to get current LSN: %w", err) } - err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, int64(latestLSN)) - if err != nil { + if err := monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, int64(latestLSN)); err != nil { c.logger.Error("error updating latest LSN at source for CDC flow", slog.Any("error", err)) return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err) } @@ -1197,7 +1199,7 @@ func (c *PostgresConnector) HandleSlotInfo( alerter *alerting.Alerter, catalogPool *pgxpool.Pool, alertKeys *alerting.AlertKeys, - slotMetricGauges peerdb_gauges.SlotMetricGauges, + slotMetricGauges otel_metrics.SlotMetricGauges, ) error { logger := shared.LoggerFromCtx(ctx) @@ -1215,12 +1217,16 @@ func (c *PostgresConnector) HandleSlotInfo( logger.Info(fmt.Sprintf("Checking %s lag for %s", alertKeys.SlotName, alertKeys.PeerName), slog.Float64("LagInMB", float64(slotInfo[0].LagInMb))) alerter.AlertIfSlotLag(ctx, alertKeys, slotInfo[0]) - slotMetricGauges.SlotLagGauge.Record(ctx, float64(slotInfo[0].LagInMb), metric.WithAttributeSet(attribute.NewSet( - attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), - attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), - attribute.String(otel_metrics.SlotNameKey, alertKeys.SlotName), - attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())), - )) + + if slotMetricGauges.SlotLagGauge != nil { + slotMetricGauges.SlotLagGauge.Record(ctx, float64(slotInfo[0].LagInMb), metric.WithAttributeSet(attribute.NewSet( + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.SlotNameKey, alertKeys.SlotName), + ))) + } else { + logger.Warn("warning: slotMetricGauges.SlotLagGauge is nil") + } // Also handles alerts for PeerDB user connections exceeding a given limit here res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User) @@ -1229,25 +1235,31 @@ func (c *PostgresConnector) HandleSlotInfo( return err } alerter.AlertIfOpenConnections(ctx, alertKeys, res) - slotMetricGauges.OpenConnectionsGauge.Record(ctx, res.CurrentOpenConnections, metric.WithAttributeSet(attribute.NewSet( - attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), - attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), - attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()), - ))) + if slotMetricGauges.OpenConnectionsGauge != nil { + slotMetricGauges.OpenConnectionsGauge.Record(ctx, res.CurrentOpenConnections, metric.WithAttributeSet(attribute.NewSet( + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + ))) + } else { + logger.Warn("warning: slotMetricGauges.OpenConnectionsGauge is nil") + } replicationRes, err := getOpenReplicationConnectionsForUser(ctx, c.conn, c.config.User) if err != nil { logger.Warn("warning: failed to get current open replication connections", "error", err) return err } - slotMetricGauges.OpenReplicationConnectionsGauge.Record(ctx, replicationRes.CurrentOpenConnections, - metric.WithAttributeSet(attribute.NewSet( - attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), - attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), - attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()), - )), - ) + if slotMetricGauges.OpenReplicationConnectionsGauge != nil { + slotMetricGauges.OpenReplicationConnectionsGauge.Record(ctx, replicationRes.CurrentOpenConnections, + metric.WithAttributeSet(attribute.NewSet( + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + )), + ) + } else { + logger.Warn("warning: slotMetricGauges.OpenReplicationConnectionsGauge is nil") + } var intervalSinceLastNormalize *time.Duration if err := alerter.CatalogPool.QueryRow( @@ -1261,13 +1273,16 @@ func (c *PostgresConnector) HandleSlotInfo( return nil } if intervalSinceLastNormalize != nil { - slotMetricGauges.IntervalSinceLastNormalizeGauge.Record(ctx, intervalSinceLastNormalize.Seconds(), - metric.WithAttributeSet(attribute.NewSet( - attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), - attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), - attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()), - )), - ) + if slotMetricGauges.IntervalSinceLastNormalizeGauge != nil { + slotMetricGauges.IntervalSinceLastNormalizeGauge.Record(ctx, intervalSinceLastNormalize.Seconds(), + metric.WithAttributeSet(attribute.NewSet( + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + )), + ) + } else { + logger.Warn("warning: slotMetricGauges.IntervalSinceLastNormalizeGauge is nil") + } alerter.AlertIfTooLongSinceLastNormalize(ctx, alertKeys, *intervalSinceLastNormalize) } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index f3c915c133..339c54a633 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -212,8 +212,8 @@ func (qe *QRepQueryExecutor) processFetchedRows( if err := rows.Err(); err != nil { stream.Close(err) qe.logger.Error("[pg_query_executor] row iteration failed", - slog.String("query", query), slog.Any("error", rows.Err())) - return 0, fmt.Errorf("[pg_query_executor] row iteration failed '%s': %w", query, rows.Err()) + slog.String("query", query), slog.Any("error", err)) + return 0, fmt.Errorf("[pg_query_executor] row iteration failed '%s': %w", query, err) } return numRows, nil diff --git a/flow/connectors/utils/cdc_store.go b/flow/connectors/utils/cdc_store.go index e3aa9e4499..d3e9d27f8d 100644 --- a/flow/connectors/utils/cdc_store.go +++ b/flow/connectors/utils/cdc_store.go @@ -115,6 +115,7 @@ func init() { gob.Register(qvalue.QValueArrayTimestamp{}) gob.Register(qvalue.QValueArrayTimestampTZ{}) gob.Register(qvalue.QValueArrayBoolean{}) + gob.Register(qvalue.QValueTSTZRange{}) } func (c *cdcStore[T]) initPebbleDB() error { diff --git a/flow/main.go b/flow/main.go index 9d499e957d..613c426340 100644 --- a/flow/main.go +++ b/flow/main.go @@ -144,7 +144,7 @@ func main() { if err != nil { return err } - defer res.Cleanup() + defer res.Close() return res.Worker.Run(worker.InterruptCh()) }, Flags: []cli.Flag{ diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 3bb2d1f248..054d6a42b1 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -30,8 +30,8 @@ func (s *QRecordStream) Schema() qvalue.QRecordSchema { func (s *QRecordStream) SetSchema(schema qvalue.QRecordSchema) { if !s.schemaSet { s.schema = schema - close(s.schemaLatch) s.schemaSet = true + close(s.schemaLatch) } } diff --git a/flow/otel_metrics/env.go b/flow/otel_metrics/env.go deleted file mode 100644 index 81b5d0c3ea..0000000000 --- a/flow/otel_metrics/env.go +++ /dev/null @@ -1,11 +0,0 @@ -package otel_metrics - -import "github.com/PeerDB-io/peer-flow/peerdbenv" - -func GetPeerDBOtelMetricsNamespace() string { - return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "") -} - -func GetPeerDBOtelTemporalMetricsExportListEnv() string { - return peerdbenv.GetEnvString("PEERDB_OTEL_TEMPORAL_METRICS_EXPORT_LIST", "") -} diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go index c59adecd41..dc3deb4246 100644 --- a/flow/otel_metrics/otel_manager.go +++ b/flow/otel_metrics/otel_manager.go @@ -17,46 +17,108 @@ import ( "github.com/PeerDB-io/peer-flow/peerdbenv" ) +const ( + SlotLagGaugeName string = "cdc_slot_lag" + OpenConnectionsGaugeName string = "open_connections" + OpenReplicationConnectionsGaugeName string = "open_replication_connections" + IntervalSinceLastNormalizeGaugeName string = "interval_since_last_normalize" + FetchedBytesCounterName string = "fetched_bytes" +) + +type SlotMetricGauges struct { + SlotLagGauge metric.Float64Gauge + OpenConnectionsGauge metric.Int64Gauge + OpenReplicationConnectionsGauge metric.Int64Gauge + IntervalSinceLastNormalizeGauge metric.Float64Gauge + FetchedBytesCounter metric.Int64Counter +} + +func BuildMetricName(baseName string) string { + return peerdbenv.GetPeerDBOtelMetricsNamespace() + baseName +} + type OtelManager struct { MetricsProvider *sdkmetric.MeterProvider Meter metric.Meter Float64GaugesCache map[string]metric.Float64Gauge Int64GaugesCache map[string]metric.Int64Gauge + Int64CountersCache map[string]metric.Int64Counter +} + +func NewOtelManager() (*OtelManager, error) { + metricsProvider, err := SetupPeerDBMetricsProvider("flow-worker") + if err != nil { + return nil, err + } + + return &OtelManager{ + MetricsProvider: metricsProvider, + Meter: metricsProvider.Meter("io.peerdb.flow-worker"), + Float64GaugesCache: make(map[string]metric.Float64Gauge), + Int64GaugesCache: make(map[string]metric.Int64Gauge), + Int64CountersCache: make(map[string]metric.Int64Counter), + }, nil +} + +func (om *OtelManager) Close(ctx context.Context) error { + return om.MetricsProvider.Shutdown(ctx) +} + +func getOrInitMetric[M any, O any]( + cons func(metric.Meter, string, ...O) (M, error), + meter metric.Meter, + cache map[string]M, + name string, + opts ...O, +) (M, error) { + gauge, ok := cache[name] + if !ok { + var err error + gauge, err = cons(meter, name, opts...) + if err != nil { + var none M + return none, err + } + cache[name] = gauge + } + return gauge, nil +} + +func (om *OtelManager) GetOrInitInt64Gauge(name string, opts ...metric.Int64GaugeOption) (metric.Int64Gauge, error) { + return getOrInitMetric(metric.Meter.Int64Gauge, om.Meter, om.Int64GaugesCache, name, opts...) +} + +func (om *OtelManager) GetOrInitFloat64Gauge(name string, opts ...metric.Float64GaugeOption) (metric.Float64Gauge, error) { + return getOrInitMetric(metric.Meter.Float64Gauge, om.Meter, om.Float64GaugesCache, name, opts...) +} + +func (om *OtelManager) GetOrInitInt64Counter(name string, opts ...metric.Int64CounterOption) (metric.Int64Counter, error) { + return getOrInitMetric(metric.Meter.Int64Counter, om.Meter, om.Int64CountersCache, name, opts...) } // newOtelResource returns a resource describing this application. func newOtelResource(otelServiceName string, attrs ...attribute.KeyValue) (*resource.Resource, error) { - allAttrs := []attribute.KeyValue{ + allAttrs := append([]attribute.KeyValue{ semconv.ServiceNameKey.String(otelServiceName), - } - allAttrs = append(allAttrs, attrs...) - r, err := resource.Merge( + attribute.String(DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()), + }, attrs...) + return resource.Merge( resource.Default(), resource.NewWithAttributes( semconv.SchemaURL, allAttrs..., ), ) - - return r, err -} - -func setupHttpOtelMetricsExporter() (sdkmetric.Exporter, error) { - return otlpmetrichttp.New(context.Background()) -} - -func setupGrpcOtelMetricsExporter() (sdkmetric.Exporter, error) { - return otlpmetricgrpc.New(context.Background()) } func temporalMetricsFilteringView() sdkmetric.View { - exportListString := GetPeerDBOtelTemporalMetricsExportListEnv() + exportListString := peerdbenv.GetPeerDBOtelTemporalMetricsExportListEnv() slog.Info("Found export list for temporal metrics", slog.String("exportList", exportListString)) // Special case for exporting all metrics if exportListString == "__ALL__" { return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { stream := sdkmetric.Stream{ - Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Name: BuildMetricName("temporal." + instrument.Name), Description: instrument.Description, Unit: instrument.Unit, } @@ -68,7 +130,7 @@ func temporalMetricsFilteringView() sdkmetric.View { if len(exportList) == 0 { return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { return sdkmetric.Stream{ - Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Name: BuildMetricName("temporal." + instrument.Name), Description: instrument.Description, Unit: instrument.Unit, Aggregation: sdkmetric.AggregationDrop{}, @@ -84,7 +146,7 @@ func temporalMetricsFilteringView() sdkmetric.View { } return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { stream := sdkmetric.Stream{ - Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Name: BuildMetricName("temporal." + instrument.Name), Description: instrument.Description, Unit: instrument.Unit, } @@ -95,16 +157,16 @@ func temporalMetricsFilteringView() sdkmetric.View { } } -func setupExporter() (sdkmetric.Exporter, error) { +func setupExporter(ctx context.Context) (sdkmetric.Exporter, error) { otlpMetricProtocol := peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_PROTOCOL", peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", "http/protobuf")) var metricExporter sdkmetric.Exporter var err error switch otlpMetricProtocol { case "http/protobuf": - metricExporter, err = setupHttpOtelMetricsExporter() + metricExporter, err = otlpmetrichttp.New(ctx) case "grpc": - metricExporter, err = setupGrpcOtelMetricsExporter() + metricExporter, err = otlpmetricgrpc.New(ctx) default: return nil, fmt.Errorf("unsupported otel metric protocol: %s", otlpMetricProtocol) } @@ -114,8 +176,8 @@ func setupExporter() (sdkmetric.Exporter, error) { return metricExporter, err } -func setupMetricsProvider(otelResource *resource.Resource, views ...sdkmetric.View) (*sdkmetric.MeterProvider, error) { - metricExporter, err := setupExporter() +func setupMetricsProvider(ctx context.Context, otelResource *resource.Resource, views ...sdkmetric.View) (*sdkmetric.MeterProvider, error) { + metricExporter, err := setupExporter(ctx) if err != nil { return nil, err } @@ -133,13 +195,13 @@ func SetupPeerDBMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvide if err != nil { return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) } - return setupMetricsProvider(otelResource) + return setupMetricsProvider(context.Background(), otelResource) } func SetupTemporalMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvider, error) { - otelResource, err := newOtelResource(otelServiceName, attribute.String(DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())) + otelResource, err := newOtelResource(otelServiceName) if err != nil { return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) } - return setupMetricsProvider(otelResource, temporalMetricsFilteringView()) + return setupMetricsProvider(context.Background(), otelResource, temporalMetricsFilteringView()) } diff --git a/flow/otel_metrics/peerdb_gauges/gauges.go b/flow/otel_metrics/peerdb_gauges/gauges.go deleted file mode 100644 index a3b7d5c3e8..0000000000 --- a/flow/otel_metrics/peerdb_gauges/gauges.go +++ /dev/null @@ -1,25 +0,0 @@ -package peerdb_gauges - -import ( - "go.opentelemetry.io/otel/metric" - - "github.com/PeerDB-io/peer-flow/otel_metrics" -) - -const ( - SlotLagGaugeName string = "cdc_slot_lag" - OpenConnectionsGaugeName string = "open_connections" - OpenReplicationConnectionsGaugeName string = "open_replication_connections" - IntervalSinceLastNormalizeGaugeName string = "interval_since_last_normalize" -) - -type SlotMetricGauges struct { - SlotLagGauge metric.Float64Gauge - OpenConnectionsGauge metric.Int64Gauge - OpenReplicationConnectionsGauge metric.Int64Gauge - IntervalSinceLastNormalizeGauge metric.Float64Gauge -} - -func BuildGaugeName(baseGaugeName string) string { - return otel_metrics.GetPeerDBOtelMetricsNamespace() + baseGaugeName -} diff --git a/flow/otel_metrics/sync_gauges.go b/flow/otel_metrics/sync_gauges.go deleted file mode 100644 index e9da02c875..0000000000 --- a/flow/otel_metrics/sync_gauges.go +++ /dev/null @@ -1,33 +0,0 @@ -package otel_metrics - -import ( - "go.opentelemetry.io/otel/metric" -) - -func GetOrInitInt64SyncGauge(meter metric.Meter, cache map[string]metric.Int64Gauge, name string, opts ...metric.Int64GaugeOption, -) (metric.Int64Gauge, error) { - gauge, ok := cache[name] - if !ok { - var err error - gauge, err = meter.Int64Gauge(name, opts...) - if err != nil { - return nil, err - } - cache[name] = gauge - } - return gauge, nil -} - -func GetOrInitFloat64SyncGauge(meter metric.Meter, cache map[string]metric.Float64Gauge, name string, opts ...metric.Float64GaugeOption, -) (metric.Float64Gauge, error) { - gauge, ok := cache[name] - if !ok { - var err error - gauge, err = meter.Float64Gauge(name, opts...) - if err != nil { - return nil, err - } - cache[name] = gauge - } - return gauge, nil -} diff --git a/flow/peerdbenv/otel.go b/flow/peerdbenv/otel.go new file mode 100644 index 0000000000..d7f3cb68a6 --- /dev/null +++ b/flow/peerdbenv/otel.go @@ -0,0 +1,9 @@ +package peerdbenv + +func GetPeerDBOtelMetricsNamespace() string { + return GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "") +} + +func GetPeerDBOtelTemporalMetricsExportListEnv() string { + return GetEnvString("PEERDB_OTEL_TEMPORAL_METRICS_EXPORT_LIST", "") +} diff --git a/flow/tags/tags.go b/flow/tags/tags.go new file mode 100644 index 0000000000..8adc9a437b --- /dev/null +++ b/flow/tags/tags.go @@ -0,0 +1,24 @@ +package tags + +import ( + "context" + "log/slog" + + "github.com/jackc/pgx/v5/pgxpool" +) + +func GetTags(ctx context.Context, catalogPool *pgxpool.Pool, flowName string) (map[string]string, error) { + var tags map[string]string + + err := catalogPool.QueryRow(ctx, "SELECT tags FROM flows WHERE name = $1", flowName).Scan(&tags) + if err != nil { + slog.Error("error getting flow tags", slog.Any("error", err)) + return nil, err + } + + if tags == nil { + tags = make(map[string]string) + } + + return tags, nil +} diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index 51bf0091a1..93086157d8 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -92,6 +92,15 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { } } + if input.FlowConnectionConfigs != nil { + err := executeCDCDropActivities(ctx, input) + if err != nil { + workflow.GetLogger(ctx).Error("failed to drop CDC flow", slog.Any("error", err)) + return err + } + workflow.GetLogger(ctx).Info("CDC flow dropped successfully") + } + removeFlowEntriesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 1 * time.Minute, }) @@ -103,14 +112,5 @@ func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { return err } - if input.FlowConnectionConfigs != nil { - err := executeCDCDropActivities(ctx, input) - if err != nil { - workflow.GetLogger(ctx).Error("failed to drop CDC flow", slog.Any("error", err)) - return err - } - workflow.GetLogger(ctx).Info("CDC flow dropped successfully") - } - return nil } diff --git a/images/in-clickpipes.png b/images/in-clickpipes.png new file mode 100644 index 0000000000..18d4c709d3 Binary files /dev/null and b/images/in-clickpipes.png differ diff --git a/nexus/Cargo.lock b/nexus/Cargo.lock index f827849ff0..15a8c32743 100644 --- a/nexus/Cargo.lock +++ b/nexus/Cargo.lock @@ -1790,7 +1790,7 @@ dependencies = [ "http 1.1.0", "hyper 1.5.0", "hyper-util", - "rustls 0.23.17", + "rustls 0.23.18", "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio", @@ -3044,7 +3044,7 @@ dependencies = [ "anyhow", "futures-util", "pt", - "rustls 0.23.17", + "rustls 0.23.18", "ssh2", "tokio", "tokio-postgres", @@ -3270,7 +3270,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.0.0", - "rustls 0.23.17", + "rustls 0.23.18", "socket2", "thiserror", "tokio", @@ -3287,7 +3287,7 @@ dependencies = [ "rand", "ring", "rustc-hash 2.0.0", - "rustls 0.23.17", + "rustls 0.23.18", "slab", "thiserror", "tinyvec", @@ -3305,7 +3305,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3518,7 +3518,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.17", + "rustls 0.23.18", "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", @@ -3583,9 +3583,9 @@ dependencies = [ [[package]] name = "rsa" -version = "0.9.6" +version = "0.9.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc" +checksum = "47c75d7c5c6b673e58bf54d8544a9f432e3a925b0e80f7cd3602ab5c50c55519" dependencies = [ "const-oid", "digest", @@ -3686,9 +3686,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.17" +version = "0.23.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f1a745511c54ba6d4465e8d5dfbd81b45791756de28d4981af70d6dca128f1e" +checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" dependencies = [ "log", "once_cell", @@ -4411,7 +4411,7 @@ checksum = "27d684bad428a0f2481f42241f821db42c54e2dc81d8c00db8536c506b0a0144" dependencies = [ "const-oid", "ring", - "rustls 0.23.17", + "rustls 0.23.18", "tokio", "tokio-postgres", "tokio-rustls 0.26.0", @@ -4445,7 +4445,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.17", + "rustls 0.23.18", "rustls-pki-types", "tokio", ] @@ -4790,7 +4790,7 @@ dependencies = [ "flate2", "log", "once_cell", - "rustls 0.23.17", + "rustls 0.23.18", "rustls-pki-types", "serde", "serde_json", @@ -5358,7 +5358,7 @@ dependencies = [ "hyper-util", "log", "percent-encoding", - "rustls 0.23.17", + "rustls 0.23.18", "rustls-pemfile 2.2.0", "seahash", "serde", diff --git a/nexus/catalog/migrations/V41__add_metadata_tags.sql b/nexus/catalog/migrations/V41__add_metadata_tags.sql new file mode 100644 index 0000000000..e3bfd29484 --- /dev/null +++ b/nexus/catalog/migrations/V41__add_metadata_tags.sql @@ -0,0 +1,2 @@ +ALTER TABLE flows +ADD COLUMN tags JSONB; diff --git a/protos/route.proto b/protos/route.proto index 1c6d38ed69..3c902ba220 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -12,18 +12,14 @@ message CreateCDCFlowRequest { peerdb_flow.FlowConnectionConfigs connection_configs = 1; } -message CreateCDCFlowResponse { - string workflow_id = 1; -} +message CreateCDCFlowResponse { string workflow_id = 1; } message CreateQRepFlowRequest { peerdb_flow.QRepConfig qrep_config = 1; bool create_catalog_entry = 2; } -message CreateQRepFlowResponse { - string workflow_id = 1; -} +message CreateQRepFlowResponse { string workflow_id = 1; } message CreateCustomSyncRequest { string flow_job_name = 1; @@ -41,23 +37,13 @@ message AlertConfig { string service_config = 3; repeated string alert_for_mirrors = 4; } -message GetAlertConfigsRequest { -} +message GetAlertConfigsRequest {} -message PostAlertConfigRequest { - AlertConfig config = 1; -} -message DeleteAlertConfigRequest { - int32 id = 1; -} -message GetAlertConfigsResponse { - repeated AlertConfig configs = 1; -} -message PostAlertConfigResponse { - int32 id = 3; -} -message DeleteAlertConfigResponse { -} +message PostAlertConfigRequest { AlertConfig config = 1; } +message DeleteAlertConfigRequest { int32 id = 1; } +message GetAlertConfigsResponse { repeated AlertConfig configs = 1; } +message PostAlertConfigResponse { int32 id = 3; } +message DeleteAlertConfigResponse {} message DynamicSetting { string name = 1; @@ -68,17 +54,13 @@ message DynamicSetting { peerdb_flow.DynconfApplyMode apply_mode = 6; peerdb_flow.DynconfTarget target_for_setting = 7; } -message GetDynamicSettingsRequest { -} -message GetDynamicSettingsResponse { - repeated DynamicSetting settings = 1; -} +message GetDynamicSettingsRequest {} +message GetDynamicSettingsResponse { repeated DynamicSetting settings = 1; } message PostDynamicSettingRequest { string name = 1; optional string value = 2; } -message PostDynamicSettingResponse { -} +message PostDynamicSettingResponse {} message Script { int32 id = 1; @@ -86,39 +68,23 @@ message Script { string name = 3; string source = 4; } -message GetScriptsRequest { - int32 id = 1; -} -message GetScriptsResponse { - repeated Script scripts = 1; -} -message PostScriptRequest { - Script script = 1; -} -message PostScriptResponse { - int32 id = 1; -} -message DeleteScriptRequest { - int32 id = 1; -} -message DeleteScriptResponse { -} +message GetScriptsRequest { int32 id = 1; } +message GetScriptsResponse { repeated Script scripts = 1; } +message PostScriptRequest { Script script = 1; } +message PostScriptResponse { int32 id = 1; } +message DeleteScriptRequest { int32 id = 1; } +message DeleteScriptResponse {} -message ValidatePeerRequest { - peerdb_peers.Peer peer = 1; -} +message ValidatePeerRequest { peerdb_peers.Peer peer = 1; } message CreatePeerRequest { peerdb_peers.Peer peer = 1; bool allow_update = 2; } -message DropPeerRequest { - string peer_name = 1; -} +message DropPeerRequest { string peer_name = 1; } -message DropPeerResponse { -} +message DropPeerResponse {} enum ValidatePeerStatus { CREATION_UNKNOWN = 0; @@ -171,7 +137,6 @@ message CDCBatch { int64 batch_id = 6; } - message CDCRowCounts { int64 total_count = 1; int64 inserts_count = 2; @@ -182,21 +147,17 @@ message CDCTableRowCounts { string table_name = 1; CDCRowCounts counts = 2; } -message CDCTableTotalCountsRequest { - string flow_job_name = 1; -} + +message CDCTableTotalCountsRequest { string flow_job_name = 1; } + message CDCTableTotalCountsResponse { CDCRowCounts total_data = 1; repeated CDCTableRowCounts tables_data = 2; } -message PeerSchemasResponse { - repeated string schemas = 1; -} +message PeerSchemasResponse { repeated string schemas = 1; } -message PeerPublicationsResponse { - repeated string publication_names = 1; -} +message PeerPublicationsResponse { repeated string publication_names = 1; } message SchemaTablesRequest { string peer_name = 1; @@ -204,9 +165,7 @@ message SchemaTablesRequest { bool cdc_enabled = 3; } -message SchemaTablesResponse { - repeated TableResponse tables = 1; -} +message SchemaTablesResponse { repeated TableResponse tables = 1; } message TableResponse { string table_name = 1; @@ -214,9 +173,7 @@ message TableResponse { string table_size = 3; } -message AllTablesResponse { - repeated string tables = 1; -} +message AllTablesResponse { repeated string tables = 1; } message TableColumnsRequest { string peer_name = 1; @@ -224,17 +181,11 @@ message TableColumnsRequest { string table_name = 3; } -message TableColumnsResponse { - repeated string columns = 1; -} +message TableColumnsResponse { repeated string columns = 1; } -message PostgresPeerActivityInfoRequest { - string peer_name = 1; -} +message PostgresPeerActivityInfoRequest { string peer_name = 1; } -message PeerInfoRequest { - string peer_name = 1; -} +message PeerInfoRequest { string peer_name = 1; } message PeerInfoResponse { peerdb_peers.Peer peer = 1; @@ -245,8 +196,7 @@ message PeerListItem { string name = 1; peerdb_peers.DBType type = 2; } -message ListPeersRequest { -} +message ListPeersRequest {} message ListPeersResponse { repeated PeerListItem items = 1; repeated PeerListItem source_items = 2; @@ -275,9 +225,7 @@ message GetSlotLagHistoryRequest { string slot_name = 2; string time_since = 3; } -message GetSlotLagHistoryResponse { - repeated SlotLagPoint data = 1; -} +message GetSlotLagHistoryResponse { repeated SlotLagPoint data = 1; } message StatInfo { int64 pid = 1; @@ -289,13 +237,9 @@ message StatInfo { string state = 7; } -message PeerSlotResponse { - repeated SlotInfo slot_data = 1; -} +message PeerSlotResponse { repeated SlotInfo slot_data = 1; } -message PeerStatResponse { - repeated StatInfo stat_data = 1; -} +message PeerStatResponse { repeated StatInfo stat_data = 1; } message CloneTableSummary { string table_name = 1; @@ -311,9 +255,7 @@ message CloneTableSummary { string mirror_name = 11; } -message SnapshotStatus { - repeated CloneTableSummary clones = 1; -} +message SnapshotStatus { repeated CloneTableSummary clones = 1; } message CDCMirrorStatus { peerdb_flow.FlowConnectionConfigs config = 1; @@ -334,9 +276,7 @@ message MirrorStatusResponse { google.protobuf.Timestamp created_at = 7; } -message InitialLoadSummaryRequest { - string parent_mirror_name = 1; -} +message InitialLoadSummaryRequest { string parent_mirror_name = 1; } message InitialLoadSummaryResponse { repeated CloneTableSummary tableSummaries = 1; @@ -366,9 +306,7 @@ message GraphResponseItem { double rows = 2; } -message GraphResponse { - repeated GraphResponseItem data = 1; -} +message GraphResponse { repeated GraphResponseItem data = 1; } message MirrorLog { string flow_name = 1; @@ -391,8 +329,7 @@ message ListMirrorLogsResponse { int32 page = 3; } -message ValidateCDCMirrorResponse{ -} +message ValidateCDCMirrorResponse {} message ListMirrorsItem { int64 id = 1; @@ -405,17 +342,11 @@ message ListMirrorsItem { double created_at = 8; bool is_cdc = 9; } -message ListMirrorsRequest { -} -message ListMirrorsResponse { - repeated ListMirrorsItem mirrors = 1; -} +message ListMirrorsRequest {} +message ListMirrorsResponse { repeated ListMirrorsItem mirrors = 1; } -message ListMirrorNamesRequest { -} -message ListMirrorNamesResponse { - repeated string names = 1; -} +message ListMirrorNamesRequest {} +message ListMirrorNamesResponse { repeated string names = 1; } message FlowStateChangeRequest { string flow_job_name = 1; @@ -424,25 +355,19 @@ message FlowStateChangeRequest { optional peerdb_flow.FlowConfigUpdate flow_config_update = 5; bool drop_mirror_stats = 6; } -message FlowStateChangeResponse { -} +message FlowStateChangeResponse {} -message PeerDBVersionRequest { -} -message PeerDBVersionResponse { - string version = 1; -} +message PeerDBVersionRequest {} +message PeerDBVersionResponse { string version = 1; } message ResyncMirrorRequest { string flow_job_name = 1; bool drop_stats = 2; } -message ResyncMirrorResponse { -} +message ResyncMirrorResponse {} -message PeerDBStateRequest { -} +message PeerDBStateRequest {} enum InstanceStatus { INSTANCE_STATUS_UNKNOWN = 0; @@ -450,12 +375,9 @@ enum InstanceStatus { INSTANCE_STATUS_MAINTENANCE = 3; } -message InstanceInfoRequest { -} +message InstanceInfoRequest {} -message InstanceInfoResponse { - InstanceStatus status = 1; -} +message InstanceInfoResponse { InstanceStatus status = 1; } enum MaintenanceStatus { MAINTENANCE_STATUS_UNKNOWN = 0; @@ -473,166 +395,289 @@ message MaintenanceResponse { string run_id = 2; } +message FlowTag { + string key = 1; + string value = 2; +} + +message CreateOrReplaceFlowTagsRequest { + string flow_name = 1; + repeated FlowTag tags = 2; +} + +message CreateOrReplaceFlowTagsResponse { string flow_name = 1; } + +message GetFlowTagsRequest { string flow_name = 1; } + +message GetFlowTagsResponse { + string flow_name = 1; + repeated FlowTag tags = 2; +} + service FlowService { rpc ValidatePeer(ValidatePeerRequest) returns (ValidatePeerResponse) { option (google.api.http) = { - post: "/v1/peers/validate", - body: "*" - }; + post : "/v1/peers/validate", + body : "*" + }; } - rpc ValidateCDCMirror(CreateCDCFlowRequest) returns (ValidateCDCMirrorResponse) { + rpc ValidateCDCMirror(CreateCDCFlowRequest) + returns (ValidateCDCMirrorResponse) { option (google.api.http) = { - post: "/v1/mirrors/cdc/validate", - body: "*" - }; + post : "/v1/mirrors/cdc/validate", + body : "*" + }; } rpc CreatePeer(CreatePeerRequest) returns (CreatePeerResponse) { option (google.api.http) = { - post: "/v1/peers/create", - body: "*" - }; + post : "/v1/peers/create", + body : "*" + }; } rpc DropPeer(DropPeerRequest) returns (DropPeerResponse) { option (google.api.http) = { - post: "/v1/peers/drop", - body: "*" + post : "/v1/peers/drop", + body : "*" }; } rpc CreateCDCFlow(CreateCDCFlowRequest) returns (CreateCDCFlowResponse) { option (google.api.http) = { - post: "/v1/flows/cdc/create", - body: "*" - }; + post : "/v1/flows/cdc/create", + body : "*" + }; } rpc CreateQRepFlow(CreateQRepFlowRequest) returns (CreateQRepFlowResponse) { option (google.api.http) = { - post: "/v1/flows/qrep/create", - body: "*" - }; + post : "/v1/flows/qrep/create", + body : "*" + }; } - rpc CustomSyncFlow(CreateCustomSyncRequest) returns (CreateCustomSyncResponse) { + rpc CustomSyncFlow(CreateCustomSyncRequest) + returns (CreateCustomSyncResponse) { option (google.api.http) = { - post: "/v1/flows/cdc/sync", - body: "*" - }; + post : "/v1/flows/cdc/sync", + body : "*" + }; } - rpc GetAlertConfigs(GetAlertConfigsRequest) returns (GetAlertConfigsResponse) { - option (google.api.http) = { get: "/v1/alerts/config" }; + rpc GetAlertConfigs(GetAlertConfigsRequest) + returns (GetAlertConfigsResponse) { + option (google.api.http) = { + get : "/v1/alerts/config" + }; } - rpc PostAlertConfig(PostAlertConfigRequest) returns (PostAlertConfigResponse) { - option (google.api.http) = { post: "/v1/alerts/config", body: "*" }; + rpc PostAlertConfig(PostAlertConfigRequest) + returns (PostAlertConfigResponse) { + option (google.api.http) = { + post : "/v1/alerts/config", + body : "*" + }; } - rpc DeleteAlertConfig(DeleteAlertConfigRequest) returns (DeleteAlertConfigResponse) { - option (google.api.http) = { delete: "/v1/alerts/config/{id}" }; + rpc DeleteAlertConfig(DeleteAlertConfigRequest) + returns (DeleteAlertConfigResponse) { + option (google.api.http) = { + delete : "/v1/alerts/config/{id}" + }; } - rpc GetDynamicSettings(GetDynamicSettingsRequest) returns (GetDynamicSettingsResponse) { - option (google.api.http) = { get: "/v1/dynamic_settings" }; + rpc GetDynamicSettings(GetDynamicSettingsRequest) + returns (GetDynamicSettingsResponse) { + option (google.api.http) = { + get : "/v1/dynamic_settings" + }; } - rpc PostDynamicSetting(PostDynamicSettingRequest) returns (PostDynamicSettingResponse) { - option (google.api.http) = { post: "/v1/dynamic_settings", body: "*" }; + rpc PostDynamicSetting(PostDynamicSettingRequest) + returns (PostDynamicSettingResponse) { + option (google.api.http) = { + post : "/v1/dynamic_settings", + body : "*" + }; } rpc GetScripts(GetScriptsRequest) returns (GetScriptsResponse) { - option (google.api.http) = { get: "/v1/scripts/{id}" }; + option (google.api.http) = { + get : "/v1/scripts/{id}" + }; } rpc PostScript(PostScriptRequest) returns (PostScriptResponse) { - option (google.api.http) = { post: "/v1/scripts", body: "*" }; + option (google.api.http) = { + post : "/v1/scripts", + body : "*" + }; } rpc DeleteScript(DeleteScriptRequest) returns (DeleteScriptResponse) { - option (google.api.http) = { delete: "/v1/scripts/{id}" }; + option (google.api.http) = { + delete : "/v1/scripts/{id}" + }; } - rpc CDCTableTotalCounts(CDCTableTotalCountsRequest) returns (CDCTableTotalCountsResponse) { - option (google.api.http) = { get: "/v1/mirrors/cdc/table_total_counts/{flow_job_name}" }; + rpc CDCTableTotalCounts(CDCTableTotalCountsRequest) + returns (CDCTableTotalCountsResponse) { + option (google.api.http) = { + get : "/v1/mirrors/cdc/table_total_counts/{flow_job_name}" + }; } - rpc GetSchemas(PostgresPeerActivityInfoRequest) returns (PeerSchemasResponse) { - option (google.api.http) = { get: "/v1/peers/schemas" }; + rpc GetSchemas(PostgresPeerActivityInfoRequest) + returns (PeerSchemasResponse) { + option (google.api.http) = { + get : "/v1/peers/schemas" + }; } - rpc GetPublications(PostgresPeerActivityInfoRequest) returns (PeerPublicationsResponse) { - option (google.api.http) = { get: "/v1/peers/publications" }; + rpc GetPublications(PostgresPeerActivityInfoRequest) + returns (PeerPublicationsResponse) { + option (google.api.http) = { + get : "/v1/peers/publications" + }; } rpc GetTablesInSchema(SchemaTablesRequest) returns (SchemaTablesResponse) { - option (google.api.http) = { get: "/v1/peers/tables" }; + option (google.api.http) = { + get : "/v1/peers/tables" + }; } - rpc GetAllTables(PostgresPeerActivityInfoRequest) returns (AllTablesResponse) { - option (google.api.http) = { get: "/v1/peers/tables/all" }; + rpc GetAllTables(PostgresPeerActivityInfoRequest) + returns (AllTablesResponse) { + option (google.api.http) = { + get : "/v1/peers/tables/all" + }; } rpc GetColumns(TableColumnsRequest) returns (TableColumnsResponse) { - option (google.api.http) = { get: "/v1/peers/columns" }; + option (google.api.http) = { + get : "/v1/peers/columns" + }; } rpc GetSlotInfo(PostgresPeerActivityInfoRequest) returns (PeerSlotResponse) { - option (google.api.http) = { get: "/v1/peers/slots/{peer_name}" }; + option (google.api.http) = { + get : "/v1/peers/slots/{peer_name}" + }; } - rpc GetSlotLagHistory(GetSlotLagHistoryRequest) returns (GetSlotLagHistoryResponse) { - option (google.api.http) = { post: "/v1/peers/slots/lag_history", body: "*" }; + rpc GetSlotLagHistory(GetSlotLagHistoryRequest) + returns (GetSlotLagHistoryResponse) { + option (google.api.http) = { + post : "/v1/peers/slots/lag_history", + body : "*" + }; } rpc GetStatInfo(PostgresPeerActivityInfoRequest) returns (PeerStatResponse) { - option (google.api.http) = { get: "/v1/peers/stats/{peer_name}" }; + option (google.api.http) = { + get : "/v1/peers/stats/{peer_name}" + }; } rpc ListMirrorLogs(ListMirrorLogsRequest) returns (ListMirrorLogsResponse) { - option (google.api.http) = { post: "/v1/mirrors/logs", body: "*" }; + option (google.api.http) = { + post : "/v1/mirrors/logs", + body : "*" + }; } rpc ListMirrors(ListMirrorsRequest) returns (ListMirrorsResponse) { - option (google.api.http) = { get: "/v1/mirrors/list" }; + option (google.api.http) = { + get : "/v1/mirrors/list" + }; } - rpc ListMirrorNames(ListMirrorNamesRequest) returns (ListMirrorNamesResponse) { - option (google.api.http) = { get: "/v1/mirrors/names" }; + rpc ListMirrorNames(ListMirrorNamesRequest) + returns (ListMirrorNamesResponse) { + option (google.api.http) = { + get : "/v1/mirrors/names" + }; } - rpc FlowStateChange(FlowStateChangeRequest) returns (FlowStateChangeResponse) { - option (google.api.http) = { post: "/v1/mirrors/state_change", body: "*" }; + rpc FlowStateChange(FlowStateChangeRequest) + returns (FlowStateChangeResponse) { + option (google.api.http) = { + post : "/v1/mirrors/state_change", + body : "*" + }; } rpc MirrorStatus(MirrorStatusRequest) returns (MirrorStatusResponse) { - option (google.api.http) = { post: "/v1/mirrors/status", body: "*" }; + option (google.api.http) = { + post : "/v1/mirrors/status", + body : "*" + }; } rpc GetCDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) { - option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}" }; + option (google.api.http) = { + get : "/v1/mirrors/cdc/batches/{flow_job_name}" + }; } rpc CDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) { - option (google.api.http) = { post: "/v1/mirrors/cdc/batches", body: "*" }; + option (google.api.http) = { + post : "/v1/mirrors/cdc/batches", + body : "*" + }; } rpc CDCGraph(GraphRequest) returns (GraphResponse) { - option (google.api.http) = { post: "/v1/mirrors/cdc/graph", body: "*" }; + option (google.api.http) = { + post : "/v1/mirrors/cdc/graph", + body : "*" + }; } - rpc InitialLoadSummary(InitialLoadSummaryRequest) returns (InitialLoadSummaryResponse) { - option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}" }; + rpc InitialLoadSummary(InitialLoadSummaryRequest) + returns (InitialLoadSummaryResponse) { + option (google.api.http) = { + get : "/v1/mirrors/cdc/initial_load/{parent_mirror_name}" + }; } rpc GetPeerInfo(PeerInfoRequest) returns (PeerInfoResponse) { - option (google.api.http) = { get: "/v1/peers/info/{peer_name}" }; + option (google.api.http) = { + get : "/v1/peers/info/{peer_name}" + }; } rpc ListPeers(ListPeersRequest) returns (ListPeersResponse) { - option (google.api.http) = { get: "/v1/peers/list" }; + option (google.api.http) = { + get : "/v1/peers/list" + }; } rpc GetVersion(PeerDBVersionRequest) returns (PeerDBVersionResponse) { - option (google.api.http) = { get: "/v1/version" }; + option (google.api.http) = { + get : "/v1/version" + }; } rpc ResyncMirror(ResyncMirrorRequest) returns (ResyncMirrorResponse) { - option (google.api.http) = { post: "/v1/mirrors/resync", body: "*" }; + option (google.api.http) = { + post : "/v1/mirrors/resync", + body : "*" + }; } rpc GetInstanceInfo(InstanceInfoRequest) returns (InstanceInfoResponse) { - option (google.api.http) = { get: "/v1/instance/info" }; + option (google.api.http) = { + get : "/v1/instance/info" + }; } rpc Maintenance(MaintenanceRequest) returns (MaintenanceResponse) { - option (google.api.http) = { post: "/v1/instance/maintenance", body: "*" }; + option (google.api.http) = { + post : "/v1/instance/maintenance", + body : "*" + }; + } + + rpc CreateOrReplaceFlowTags(CreateOrReplaceFlowTagsRequest) + returns (CreateOrReplaceFlowTagsResponse) { + option (google.api.http) = { + post : "/v1/flows/tags", + body : "*" + }; + } + + rpc GetFlowTags(GetFlowTagsRequest) returns (GetFlowTagsResponse) { + option (google.api.http) = { + get : "/v1/flows/tags/{flow_name}" + }; } } diff --git a/renovate.json b/renovate.json index b9a8ffef6a..17de3825ed 100644 --- a/renovate.json +++ b/renovate.json @@ -25,7 +25,7 @@ "enabled": false }, { - "matchPackageNames": ["next"], + "matchPackageNames": ["next", "eslint", "eslint-config-next"], "matchManagers": ["npm"], "matchUpdateTypes": ["major"], "enabled": false diff --git a/stacks/peerdb-server.Dockerfile b/stacks/peerdb-server.Dockerfile index 50c69f8076..16f0a58eaa 100644 --- a/stacks/peerdb-server.Dockerfile +++ b/stacks/peerdb-server.Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1@sha256:865e5dd094beca432e8c0a1d5e1c465db5f998dca4e439981029b3b81fb39ed5 -FROM lukemathwalker/cargo-chef:latest-rust-alpine3.20@sha256:75f772fe2d870acb77ffdb2206810cd694a6720263f94c74fcc75080963dbff5 as chef +FROM lukemathwalker/cargo-chef:latest-rust-alpine3.20@sha256:a539f69c0a6b9d328b398f1e7aed81d53e986b49db485557cdb3e4479ea42889 as chef WORKDIR /root FROM chef as planner diff --git a/ui/app/peers/[peerName]/lagGraph.tsx b/ui/app/peers/[peerName]/lagGraph.tsx index 87b90fa8c8..d971bee8f0 100644 --- a/ui/app/peers/[peerName]/lagGraph.tsx +++ b/ui/app/peers/[peerName]/lagGraph.tsx @@ -21,9 +21,10 @@ type LagGraphProps = { function parseLSN(lsn: string): number { if (!lsn) return 0; const [lsn1, lsn2] = lsn.split('/'); - return Number( - (BigInt(parseInt(lsn1, 16)) << BigInt(32)) | BigInt(parseInt(lsn2, 16)) - ); + const parsedLsn1 = parseInt(lsn1, 16); + const parsedLsn2 = parseInt(lsn2, 16); + if (isNaN(parsedLsn1) || isNaN(parsedLsn2)) return 0; + return Number((BigInt(parsedLsn1) << BigInt(32)) | BigInt(parsedLsn2)); } export default function LagGraph({ peerName }: LagGraphProps) { diff --git a/ui/app/settings/page.tsx b/ui/app/settings/page.tsx index 7ebb1b4cd0..c1d51a2280 100644 --- a/ui/app/settings/page.tsx +++ b/ui/app/settings/page.tsx @@ -9,10 +9,7 @@ import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { SearchField } from '@/lib/SearchField'; -import { Table, TableCell, TableRow } from '@/lib/Table'; import { TextField } from '@/lib/TextField'; -import { Tooltip } from '@/lib/Tooltip'; -import { MaterialSymbol } from 'material-symbols'; import { useEffect, useMemo, useState } from 'react'; import { ToastContainer } from 'react-toastify'; import 'react-toastify/dist/ReactToastify.css'; @@ -22,40 +19,32 @@ const ROWS_PER_PAGE = 7; const ApplyModeIconWithTooltip = ({ applyMode }: { applyMode: number }) => { let tooltipText = ''; - let iconName: MaterialSymbol = 'help'; + switch (applyMode.toString()) { case DynconfApplyMode[DynconfApplyMode.APPLY_MODE_IMMEDIATE].toString(): tooltipText = 'Changes to this configuration will apply immediately'; - iconName = 'bolt'; break; case DynconfApplyMode[DynconfApplyMode.APPLY_MODE_AFTER_RESUME].toString(): tooltipText = 'Changes to this configuration will apply after resume'; - iconName = 'cached'; break; case DynconfApplyMode[DynconfApplyMode.APPLY_MODE_RESTART].toString(): tooltipText = 'Changes to this configuration will apply after server restart.'; - iconName = 'restart_alt'; break; case DynconfApplyMode[DynconfApplyMode.APPLY_MODE_NEW_MIRROR].toString(): tooltipText = 'Changes to this configuration will apply only to new mirrors'; - iconName = 'new_window'; break; default: tooltipText = 'Unknown apply mode'; - iconName = 'help'; } return (
- - - +
); }; - const DynamicSettingItem = ({ setting, onSettingUpdate, @@ -65,7 +54,7 @@ const DynamicSettingItem = ({ }) => { const [editMode, setEditMode] = useState(false); const [newValue, setNewValue] = useState(setting.value); - + const [showDescription, setShowDescription] = useState(false); const handleEdit = () => { setEditMode(true); }; @@ -130,41 +119,80 @@ const DynamicSettingItem = ({ }; return ( - - - - - - {editMode ? ( -
- setNewValue(e.target.value)} - variant='simple' - /> - +
+
+ +
+
+
+
+
+ setNewValue(e.target.value)} + variant='simple' + readOnly={!editMode} + disabled={!editMode} + /> + +
+
+ +
+
- ) : ( -
- {setting.value || 'N/A'} - +
+
- )} - - - {setting.defaultValue || 'N/A'} - - - {setting.description || 'N/A'} - - - - - + + {showDescription && ( +
+ +
+ )} +
+
+
); }; @@ -172,10 +200,7 @@ const SettingsPage = () => { const [settings, setSettings] = useState({ settings: [], }); - const [currentPage, setCurrentPage] = useState(1); const [searchQuery, setSearchQuery] = useState(''); - const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('asc'); - const sortField = 'name'; const fetchSettings = async () => { const response = await fetch('/api/v1/dynamic_settings'); @@ -189,101 +214,44 @@ const SettingsPage = () => { const filteredSettings = useMemo( () => - settings.settings - .filter((setting) => - setting.name.toLowerCase().includes(searchQuery.toLowerCase()) - ) - .sort((a, b) => { - const aValue = a[sortField]; - const bValue = b[sortField]; - if (aValue < bValue) return sortDir === 'dsc' ? 1 : -1; - if (aValue > bValue) return sortDir === 'dsc' ? -1 : 1; - return 0; - }), - [settings, searchQuery, sortDir] + settings.settings.filter((setting) => + setting.name.toLowerCase().includes(searchQuery.toLowerCase()) + ), + [settings, searchQuery] ); - const totalPages = Math.ceil(filteredSettings.length / ROWS_PER_PAGE); - const displayedSettings = useMemo(() => { - const startRow = (currentPage - 1) * ROWS_PER_PAGE; - const endRow = startRow + ROWS_PER_PAGE; - return filteredSettings.slice(startRow, endRow); - }, [filteredSettings, currentPage]); - - const handlePrevPage = () => { - if (currentPage > 1) setCurrentPage(currentPage - 1); - }; - - const handleNextPage = () => { - if (currentPage < totalPages) setCurrentPage(currentPage + 1); - }; return ( -
- Settings List} - toolbar={{ - left: ( -
- - - - - - -
- ), - right: ( - setSearchQuery(e.target.value)} - /> - ), +
+ + setSearchQuery(e.target.value)} + style={{ fontSize: 13 }} + /> +
- {[ - { header: 'Configuration Name', width: '35%' }, - { header: 'Current Value', width: '10%' }, - { header: 'Default Value', width: '10%' }, - { header: 'Description', width: '35%' }, - { header: 'Apply Mode', width: '10%' }, - ].map(({ header, width }) => ( - - {header} - - ))} - - } > - {displayedSettings.map((setting) => ( + {filteredSettings.map((setting) => ( ))} -
- + +
); }; diff --git a/ui/package-lock.json b/ui/package-lock.json index f233b02b90..c581877582 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -29,7 +29,7 @@ "@types/react": "^18.3.11", "@types/react-dom": "^18.3.0", "classnames": "^2.5.1", - "lucide-react": "^0.460.0", + "lucide-react": "^0.462.0", "material-symbols": "^0.27.0", "moment": "^2.30.1", "moment-timezone": "^0.5.46", @@ -53,14 +53,14 @@ "eslint": "8.57.1", "eslint-config-next": "14.2.17", "eslint-config-prettier": "9.1.0", - "less": "4.2.0", - "postcss": "8.4.47", - "prettier": "3.3.3", + "less": "4.2.1", + "postcss": "8.4.49", + "prettier": "3.4.1", "prettier-plugin-organize-imports": "4.1.0", "string-width": "7.2.0", - "tailwindcss": "3.4.14", + "tailwindcss": "3.4.15", "tailwindcss-animate": "1.0.7", - "typescript": "5.6.3", + "typescript": "5.7.2", "webpack": "5.96.1" } }, @@ -2255,12 +2255,12 @@ "license": "MIT" }, "node_modules/@types/node": { - "version": "22.9.0", - "resolved": "https://registry.npmjs.org/@types/node/-/node-22.9.0.tgz", - "integrity": "sha512-vuyHg81vvWA1Z1ELfvLko2c8f34gyA0zaic0+Rllc5lbCnbSyuvb2Oxpm6TAUAC/2xZN3QGqxBNggD1nNR2AfQ==", + "version": "22.10.0", + "resolved": "https://registry.npmjs.org/@types/node/-/node-22.10.0.tgz", + "integrity": "sha512-XC70cRZVElFHfIUB40FgZOBbgJYFKKMa5nb9lxcwYstFG/Mi+/Y0bGS+rs6Dmhmkpq4pnNiLiuZAbc02YCOnmA==", "license": "MIT", "dependencies": { - "undici-types": "~6.19.8" + "undici-types": "~6.20.0" } }, "node_modules/@types/parse-json": { @@ -5870,9 +5870,9 @@ } }, "node_modules/less": { - "version": "4.2.0", - "resolved": "https://registry.npmjs.org/less/-/less-4.2.0.tgz", - "integrity": "sha512-P3b3HJDBtSzsXUl0im2L7gTO5Ubg8mEN6G8qoTS77iXxXX4Hvu4Qj540PZDvQ8V6DmX6iXo98k7Md0Cm1PrLaA==", + "version": "4.2.1", + "resolved": "https://registry.npmjs.org/less/-/less-4.2.1.tgz", + "integrity": "sha512-CasaJidTIhWmjcqv0Uj5vccMI7pJgfD9lMkKtlnTHAdJdYK/7l8pM9tumLyJ0zhbD4KJLo/YvTj+xznQd5NBhg==", "dev": true, "license": "Apache-2.0", "dependencies": { @@ -6001,9 +6001,9 @@ } }, "node_modules/lucide-react": { - "version": "0.460.0", - "resolved": "https://registry.npmjs.org/lucide-react/-/lucide-react-0.460.0.tgz", - "integrity": "sha512-BVtq/DykVeIvRTJvRAgCsOwaGL8Un3Bxh8MbDxMhEWlZay3T4IpEKDEpwt5KZ0KJMHzgm6jrltxlT5eXOWXDHg==", + "version": "0.462.0", + "resolved": "https://registry.npmjs.org/lucide-react/-/lucide-react-0.462.0.tgz", + "integrity": "sha512-NTL7EbAao9IFtuSivSZgrAh4fZd09Lr+6MTkqIxuHaH2nnYiYIzXPo06cOxHg9wKLdj6LL8TByG4qpePqwgx/g==", "license": "ISC", "peerDependencies": { "react": "^16.5.1 || ^17.0.0 || ^18.0.0 || ^19.0.0-rc" @@ -6036,9 +6036,9 @@ } }, "node_modules/material-symbols": { - "version": "0.27.0", - "resolved": "https://registry.npmjs.org/material-symbols/-/material-symbols-0.27.0.tgz", - "integrity": "sha512-nRHpnw2Cz7eNl6GptgHHhkjauL0zvkYsuiqy1HBifOYCY4fdbZ/PwtdZN4RNmwA+9jQPoymvlArVPPX5nYTdZg==", + "version": "0.27.1", + "resolved": "https://registry.npmjs.org/material-symbols/-/material-symbols-0.27.1.tgz", + "integrity": "sha512-ICw3sP2EyCsxo1T2vvQGhxcUX8sqb3FYLF0vTUOjCNPdJ8G1Z3bn3wjAh2ZIdP/AfGy96zuBY5okK3Ag4XLyVw==", "license": "Apache-2.0" }, "node_modules/memoize-one": { @@ -6752,9 +6752,9 @@ } }, "node_modules/postcss": { - "version": "8.4.47", - "resolved": "https://registry.npmjs.org/postcss/-/postcss-8.4.47.tgz", - "integrity": "sha512-56rxCq7G/XfB4EkXq9Egn5GCqugWvDFjafDOThIdMBsI15iqPqR5r15TfSr1YPYeEI19YeaXMCbY6u88Y76GLQ==", + "version": "8.4.49", + "resolved": "https://registry.npmjs.org/postcss/-/postcss-8.4.49.tgz", + "integrity": "sha512-OCVPnIObs4N29kxTjzLfUryOkvZEq+pf8jTF0lg8E7uETuWHA+v7j3c/xJmiqpX450191LlmZfUKkXxkTry7nA==", "funding": [ { "type": "opencollective", @@ -6772,7 +6772,7 @@ "license": "MIT", "dependencies": { "nanoid": "^3.3.7", - "picocolors": "^1.1.0", + "picocolors": "^1.1.1", "source-map-js": "^1.2.1" }, "engines": { @@ -6951,9 +6951,9 @@ } }, "node_modules/prettier": { - "version": "3.3.3", - "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.3.3.tgz", - "integrity": "sha512-i2tDNA0O5IrMO757lfrdQZCc2jPNDVntV0m/+4whiDfWaTKfMNgR7Qz0NAeGz/nRqF4m5/6CLzbP4/liHt12Ew==", + "version": "3.4.1", + "resolved": "https://registry.npmjs.org/prettier/-/prettier-3.4.1.tgz", + "integrity": "sha512-G+YdqtITVZmOJje6QkXQWzl3fSfMxFwm1tjTyo9exhkmWSqC4Yhd1+lug++IlR2mvRVAxEDDWYkQdeSztajqgg==", "dev": true, "license": "MIT", "bin": { @@ -8225,33 +8225,33 @@ } }, "node_modules/tailwindcss": { - "version": "3.4.14", - "resolved": "https://registry.npmjs.org/tailwindcss/-/tailwindcss-3.4.14.tgz", - "integrity": "sha512-IcSvOcTRcUtQQ7ILQL5quRDg7Xs93PdJEk1ZLbhhvJc7uj/OAhYOnruEiwnGgBvUtaUAJ8/mhSw1o8L2jCiENA==", + "version": "3.4.15", + "resolved": "https://registry.npmjs.org/tailwindcss/-/tailwindcss-3.4.15.tgz", + "integrity": "sha512-r4MeXnfBmSOuKUWmXe6h2CcyfzJCEk4F0pptO5jlnYSIViUkVmsawj80N5h2lO3gwcmSb4n3PuN+e+GC1Guylw==", "license": "MIT", "dependencies": { "@alloc/quick-lru": "^5.2.0", "arg": "^5.0.2", - "chokidar": "^3.5.3", + "chokidar": "^3.6.0", "didyoumean": "^1.2.2", "dlv": "^1.1.3", - "fast-glob": "^3.3.0", + "fast-glob": "^3.3.2", "glob-parent": "^6.0.2", "is-glob": "^4.0.3", - "jiti": "^1.21.0", + "jiti": "^1.21.6", "lilconfig": "^2.1.0", - "micromatch": "^4.0.5", + "micromatch": "^4.0.8", "normalize-path": "^3.0.0", "object-hash": "^3.0.0", - "picocolors": "^1.0.0", - "postcss": "^8.4.23", + "picocolors": "^1.1.1", + "postcss": "^8.4.47", "postcss-import": "^15.1.0", "postcss-js": "^4.0.1", - "postcss-load-config": "^4.0.1", - "postcss-nested": "^6.0.1", - "postcss-selector-parser": "^6.0.11", - "resolve": "^1.22.2", - "sucrase": "^3.32.0" + "postcss-load-config": "^4.0.2", + "postcss-nested": "^6.2.0", + "postcss-selector-parser": "^6.1.2", + "resolve": "^1.22.8", + "sucrase": "^3.35.0" }, "bin": { "tailwind": "lib/cli.js", @@ -8558,9 +8558,9 @@ } }, "node_modules/typescript": { - "version": "5.6.3", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.6.3.tgz", - "integrity": "sha512-hjcS1mhfuyi4WW8IWtjP7brDrG2cuDZukyrYrSauoXGNgx0S7zceP07adYkJycEr56BOUTNPzbInooiN3fn1qw==", + "version": "5.7.2", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-5.7.2.tgz", + "integrity": "sha512-i5t66RHxDvVN40HfDd1PsEThGNnlMCMT3jMUuoh9/0TaqWevNontacunWyN02LA9/fIbEWlcHZcgTKb9QoaLfg==", "dev": true, "license": "Apache-2.0", "bin": { @@ -8588,9 +8588,9 @@ } }, "node_modules/undici-types": { - "version": "6.19.8", - "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.19.8.tgz", - "integrity": "sha512-ve2KP6f/JnbPBFyobGHuerC9g1FYGn/F8n1LWTwNxCEzd6IfqTwUQcNXgEtmmQ6DlRrC1hrSrBnCZPokRrDHjw==", + "version": "6.20.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.20.0.tgz", + "integrity": "sha512-Ny6QZ2Nju20vw1SRHe3d9jVu6gJ+4e3+MMpqu7pqE5HT6WsTSlce++GQmK5UXS8mzV8DSYHrQH+Xrf2jVcuKNg==", "license": "MIT" }, "node_modules/unicorn-magic": { diff --git a/ui/package.json b/ui/package.json index d755ce8640..9f4833398d 100644 --- a/ui/package.json +++ b/ui/package.json @@ -31,7 +31,7 @@ "@types/react": "^18.3.11", "@types/react-dom": "^18.3.0", "classnames": "^2.5.1", - "lucide-react": "^0.460.0", + "lucide-react": "^0.462.0", "material-symbols": "^0.27.0", "moment": "^2.30.1", "moment-timezone": "^0.5.46", @@ -55,14 +55,14 @@ "eslint": "8.57.1", "eslint-config-next": "14.2.17", "eslint-config-prettier": "9.1.0", - "less": "4.2.0", - "postcss": "8.4.47", - "prettier": "3.3.3", + "less": "4.2.1", + "postcss": "8.4.49", + "prettier": "3.4.1", "prettier-plugin-organize-imports": "4.1.0", "string-width": "7.2.0", - "tailwindcss": "3.4.14", + "tailwindcss": "3.4.15", "tailwindcss-animate": "1.0.7", - "typescript": "5.6.3", + "typescript": "5.7.2", "webpack": "5.96.1" } }