From b23911e935f7af949fa4ec8adeb9463986c9e307 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Wed, 30 Oct 2024 02:06:29 +0530 Subject: [PATCH 01/11] feat(temporal): enable sdk metrics (#2200) - Adds a new env variable to enable specific metric from Temporal SDK: - `PEERDB_TEMPORAL_OTEL_METRICS_EXPORT_LIST` can be a comma separated list of metrics to expose. If the list is empty or no metric matches the elements in the list, then they are not exported - If the value is set to `__ALL__`, then all Temporal metrics are exposed - Additionally adds an initial interface of how temporal interceptors would look like - The added metrics include `temporal_workflow_task_execution_failed` metric having an attribute of `failure_reason` whose value can tell us about nondeterminism if the value is `nondeterminismerror` (https://github.com/temporalio/sdk-go/pull/1295) - Additionally, the metrics are prefixed with `temporal.` PeerDB metrics should not be affected as they use a separate exporter and meterprovider. The rationale behind exporting a subset of metrics is noise and cases where metrics ingestion cannot be ignored Full list of metrics can be viewed at https://docs.temporal.io/references/sdk-metrics --- flow/cmd/logged_interceptor.go | 79 +++++++++++++++++ flow/cmd/worker.go | 16 +++- flow/connectors/postgres/postgres.go | 29 +++---- flow/go.mod | 6 +- flow/go.sum | 10 +++ .../{peerdb_gauges => }/attributes.go | 2 +- flow/otel_metrics/env.go | 11 +++ flow/otel_metrics/otel_manager.go | 84 +++++++++++++++++-- flow/otel_metrics/peerdb_gauges/gauges.go | 3 +- 9 files changed, 214 insertions(+), 26 deletions(-) create mode 100644 flow/cmd/logged_interceptor.go rename flow/otel_metrics/{peerdb_gauges => }/attributes.go (88%) create mode 100644 flow/otel_metrics/env.go diff --git a/flow/cmd/logged_interceptor.go b/flow/cmd/logged_interceptor.go new file mode 100644 index 0000000000..d5ee4fc972 --- /dev/null +++ b/flow/cmd/logged_interceptor.go @@ -0,0 +1,79 @@ +package cmd + +import ( + "context" + + "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/workflow" +) + +type LoggedWorkflowInboundInterceptor struct { + interceptor.WorkflowInboundInterceptorBase + Next interceptor.WorkflowInboundInterceptor +} + +func NewLoggedWorkflowInboundInterceptor(next interceptor.WorkflowInboundInterceptor) *LoggedWorkflowInboundInterceptor { + return &LoggedWorkflowInboundInterceptor{ + WorkflowInboundInterceptorBase: interceptor.WorkflowInboundInterceptorBase{Next: next}, + Next: next, + } +} + +func (w *LoggedWorkflowInboundInterceptor) ExecuteWorkflow( + ctx workflow.Context, + in *interceptor.ExecuteWorkflowInput, +) (interface{}, error) { + // Workflow starts here + result, err := w.Next.ExecuteWorkflow(ctx, in) + // Workflow ends here + return result, err +} + +type LoggedActivityInboundInterceptor struct { + interceptor.ActivityInboundInterceptorBase + Next interceptor.ActivityInboundInterceptor +} + +func NewLoggedActivityInboundInterceptor(next interceptor.ActivityInboundInterceptor) *LoggedActivityInboundInterceptor { + return &LoggedActivityInboundInterceptor{ + ActivityInboundInterceptorBase: interceptor.ActivityInboundInterceptorBase{Next: next}, + Next: next, + } +} + +func (c *LoggedActivityInboundInterceptor) ExecuteActivity( + ctx context.Context, + in *interceptor.ExecuteActivityInput, +) (interface{}, error) { + // Activity starts here + out, err := c.Next.ExecuteActivity(ctx, in) + // Activity ends here + return out, err +} + +type LoggedWorkerInterceptor struct { + interceptor.WorkerInterceptorBase +} + +func (c LoggedWorkerInterceptor) InterceptActivity( + ctx context.Context, + next interceptor.ActivityInboundInterceptor, +) interceptor.ActivityInboundInterceptor { + return NewLoggedActivityInboundInterceptor(next) +} + +func (c LoggedWorkerInterceptor) InterceptWorkflow( + ctx workflow.Context, + next interceptor.WorkflowInboundInterceptor, +) interceptor.WorkflowInboundInterceptor { + // Workflow intercepted here + intercepted := NewLoggedWorkflowInboundInterceptor(next) + // Workflow intercepting ends here + return intercepted +} + +func NewLoggedWorkerInterceptor() *LoggedWorkerInterceptor { + return &LoggedWorkerInterceptor{ + WorkerInterceptorBase: interceptor.WorkerInterceptorBase{}, + } +} diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index bf2809d10c..9db97288cc 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/pyroscope-go" "go.temporal.io/sdk/client" + temporalotel "go.temporal.io/sdk/contrib/opentelemetry" "go.temporal.io/sdk/worker" "github.com/PeerDB-io/peer-flow/activities" @@ -88,6 +89,15 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { Namespace: opts.TemporalNamespace, Logger: slog.New(shared.NewSlogHandler(slog.NewJSONHandler(os.Stdout, nil))), } + if opts.EnableOtelMetrics { + metricsProvider, metricsErr := otel_metrics.SetupTemporalMetricsProvider("flow-worker") + if metricsErr != nil { + return nil, metricsErr + } + clientOptions.MetricsHandler = temporalotel.NewMetricsHandler(temporalotel.MetricsHandlerOptions{ + Meter: metricsProvider.Meter("temporal-sdk-go"), + }) + } if peerdbenv.PeerDBTemporalEnableCertAuth() { slog.Info("Using temporal certificate/key for authentication") @@ -136,9 +146,9 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { cleanupOtelManagerFunc := func() {} var otelManager *otel_metrics.OtelManager if opts.EnableOtelMetrics { - metricsProvider, metricErr := otel_metrics.SetupOtelMetricsExporter("flow-worker") - if metricErr != nil { - return nil, metricErr + metricsProvider, metricsErr := otel_metrics.SetupPeerDBMetricsProvider("flow-worker") + if metricsErr != nil { + return nil, metricsErr } otelManager = &otel_metrics.OtelManager{ MetricsProvider: metricsProvider, diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index ad7ca3951d..d0087d3beb 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -26,7 +26,8 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - peerdb_gauges "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges" + "github.com/PeerDB-io/peer-flow/otel_metrics" + "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -1192,10 +1193,10 @@ func (c *PostgresConnector) HandleSlotInfo( slog.Float64("LagInMB", float64(slotInfo[0].LagInMb))) alerter.AlertIfSlotLag(ctx, alertKeys, slotInfo[0]) slotMetricGauges.SlotLagGauge.Set(float64(slotInfo[0].LagInMb), attribute.NewSet( - attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName), - attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName), - attribute.String(peerdb_gauges.SlotNameKey, alertKeys.SlotName), - attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.SlotNameKey, alertKeys.SlotName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) // Also handles alerts for PeerDB user connections exceeding a given limit here res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User) @@ -1205,9 +1206,9 @@ func (c *PostgresConnector) HandleSlotInfo( } alerter.AlertIfOpenConnections(ctx, alertKeys, res) slotMetricGauges.OpenConnectionsGauge.Set(res.CurrentOpenConnections, attribute.NewSet( - attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName), - attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName), - attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) replicationRes, err := getOpenReplicationConnectionsForUser(ctx, c.conn, c.config.User) if err != nil { @@ -1216,9 +1217,9 @@ func (c *PostgresConnector) HandleSlotInfo( } slotMetricGauges.OpenReplicationConnectionsGauge.Set(replicationRes.CurrentOpenConnections, attribute.NewSet( - attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName), - attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName), - attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) var intervalSinceLastNormalize *time.Duration err = alerter.CatalogPool.QueryRow(ctx, "SELECT now()-max(end_time) FROM peerdb_stats.cdc_batches WHERE flow_name=$1", @@ -1233,9 +1234,9 @@ func (c *PostgresConnector) HandleSlotInfo( } if intervalSinceLastNormalize != nil { slotMetricGauges.IntervalSinceLastNormalizeGauge.Set(intervalSinceLastNormalize.Seconds(), attribute.NewSet( - attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName), - attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName), - attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) alerter.AlertIfTooLongSinceLastNormalize(ctx, alertKeys, *intervalSinceLastNormalize) } diff --git a/flow/go.mod b/flow/go.mod index e6c76516d7..e24ffa9fb0 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -56,11 +56,16 @@ require ( go.opentelemetry.io/otel v1.31.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 go.opentelemetry.io/otel/metric v1.31.0 go.opentelemetry.io/otel/sdk v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 + go.opentelemetry.io/otel/trace v1.31.0 go.temporal.io/api v1.40.0 go.temporal.io/sdk v1.29.1 + go.temporal.io/sdk/contrib/opentelemetry v0.6.0 go.uber.org/automaxprocs v1.6.0 golang.org/x/crypto v0.28.0 golang.org/x/sync v0.8.0 @@ -139,7 +144,6 @@ require ( go.opentelemetry.io/contrib/detectors/gcp v1.31.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect - go.opentelemetry.io/otel/trace v1.31.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/term v0.25.0 // indirect diff --git a/flow/go.sum b/flow/go.sum index b7992ec679..8f783af565 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -490,6 +490,12 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 h1:FZ6 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0/go.mod h1:MdEu/mC6j3D+tTEfvI15b5Ci2Fn7NneJ71YMoiS3tpI= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 h1:ZsXq73BERAiNuuFXYqP4MR5hBrjXfMGSO+Cx7qoOZiM= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0/go.mod h1:hg1zaDMpyZJuUzjFxFsRYBoccE86tM9Uf4IqNMUxvrY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 h1:K0XaT3DwHAcV4nKLzcQvwAgSyisUghWoY20I7huthMk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0/go.mod h1:B5Ki776z/MBnVha1Nzwp5arlzBbE3+1jk+pGmaP5HME= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 h1:FFeLy03iVTXP6ffeN2iXrxfGsZGCjVx0/4KlizjyBwU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0/go.mod h1:TMu73/k1CP8nBUpDLc71Wj/Kf7ZS9FK5b53VapRsP9o= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= @@ -504,10 +510,14 @@ go.temporal.io/api v1.40.0 h1:rH3HvUUCFr0oecQTBW5tI6DdDQsX2Xb6OFVgt/bvLto= go.temporal.io/api v1.40.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis= go.temporal.io/sdk v1.29.1 h1:y+sUMbUhTU9rj50mwIZAPmcXCtgUdOWS9xHDYRYSgZ0= go.temporal.io/sdk v1.29.1/go.mod h1:kp//DRvn3CqQVBCtjL51Oicp9wrZYB2s6row1UgzcKQ= +go.temporal.io/sdk/contrib/opentelemetry v0.6.0 h1:rNBArDj5iTUkcMwKocUShoAW59o6HdS7Nq4CTp4ldj8= +go.temporal.io/sdk/contrib/opentelemetry v0.6.0/go.mod h1:Lem8VrE2ks8P+FYcRM3UphPoBr+tfM3v/Kaf0qStzSg= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/flow/otel_metrics/peerdb_gauges/attributes.go b/flow/otel_metrics/attributes.go similarity index 88% rename from flow/otel_metrics/peerdb_gauges/attributes.go rename to flow/otel_metrics/attributes.go index 78b54b6119..bd17cfeeb2 100644 --- a/flow/otel_metrics/peerdb_gauges/attributes.go +++ b/flow/otel_metrics/attributes.go @@ -1,4 +1,4 @@ -package peerdb_gauges +package otel_metrics const ( PeerNameKey string = "peerName" diff --git a/flow/otel_metrics/env.go b/flow/otel_metrics/env.go new file mode 100644 index 0000000000..f388bf664d --- /dev/null +++ b/flow/otel_metrics/env.go @@ -0,0 +1,11 @@ +package otel_metrics + +import "github.com/PeerDB-io/peer-flow/peerdbenv" + +func GetPeerDBOtelMetricsNamespace() string { + return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "") +} + +func GetPeerDBOtelMetricsExportListEnv() string { + return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_EXPORT_LIST", "") +} diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go index 112124d203..f69a4aa30f 100644 --- a/flow/otel_metrics/otel_manager.go +++ b/flow/otel_metrics/otel_manager.go @@ -3,7 +3,10 @@ package otel_metrics import ( "context" "fmt" + "log/slog" + "strings" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/metric" @@ -22,12 +25,16 @@ type OtelManager struct { } // newOtelResource returns a resource describing this application. -func newOtelResource(otelServiceName string) (*resource.Resource, error) { +func newOtelResource(otelServiceName string, attrs ...attribute.KeyValue) (*resource.Resource, error) { + allAttrs := []attribute.KeyValue{ + semconv.ServiceNameKey.String(otelServiceName), + } + allAttrs = append(allAttrs, attrs...) r, err := resource.Merge( resource.Default(), resource.NewWithAttributes( semconv.SchemaURL, - semconv.ServiceNameKey.String(otelServiceName), + allAttrs..., ), ) @@ -42,7 +49,53 @@ func setupGrpcOtelMetricsExporter() (sdkmetric.Exporter, error) { return otlpmetricgrpc.New(context.Background()) } -func SetupOtelMetricsExporter(otelServiceName string) (*sdkmetric.MeterProvider, error) { +func temporalMetricsFilteringView() sdkmetric.View { + exportListString := GetPeerDBOtelMetricsExportListEnv() + slog.Info("Found export list for temporal metrics", slog.String("exportList", exportListString)) + // Special case for exporting all metrics + if exportListString == "__ALL__" { + return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { + stream := sdkmetric.Stream{ + Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Description: instrument.Description, + Unit: instrument.Unit, + } + return stream, true + } + } + exportList := strings.Split(exportListString, ",") + // Don't export any metrics if the list is empty + if len(exportList) == 0 { + return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { + return sdkmetric.Stream{ + Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Description: instrument.Description, + Unit: instrument.Unit, + Aggregation: sdkmetric.AggregationDrop{}, + }, true + } + } + + // Export only the metrics in the list + enabledMetrics := make(map[string]struct{}, len(exportList)) + for _, metricName := range exportList { + trimmedMetricName := strings.TrimSpace(metricName) + enabledMetrics[trimmedMetricName] = struct{}{} + } + return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { + stream := sdkmetric.Stream{ + Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Description: instrument.Description, + Unit: instrument.Unit, + } + if _, ok := enabledMetrics[instrument.Name]; !ok { + stream.Aggregation = sdkmetric.AggregationDrop{} + } + return stream, true + } +} + +func setupExporter() (sdkmetric.Exporter, error) { otlpMetricProtocol := peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_PROTOCOL", peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", "http/protobuf")) var metricExporter sdkmetric.Exporter @@ -58,14 +111,35 @@ func SetupOtelMetricsExporter(otelServiceName string) (*sdkmetric.MeterProvider, if err != nil { return nil, fmt.Errorf("failed to create OpenTelemetry metrics exporter: %w", err) } - otelResource, err := newOtelResource(otelServiceName) + return metricExporter, err +} + +func setupMetricsProvider(otelResource *resource.Resource, views ...sdkmetric.View) (*sdkmetric.MeterProvider, error) { + metricExporter, err := setupExporter() if err != nil { - return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + return nil, err } meterProvider := sdkmetric.NewMeterProvider( sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter)), sdkmetric.WithResource(otelResource), + sdkmetric.WithView(views...), ) return meterProvider, nil } + +func SetupPeerDBMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvider, error) { + otelResource, err := newOtelResource(otelServiceName) + if err != nil { + return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + } + return setupMetricsProvider(otelResource) +} + +func SetupTemporalMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvider, error) { + otelResource, err := newOtelResource(otelServiceName, attribute.String(DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())) + if err != nil { + return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + } + return setupMetricsProvider(otelResource, temporalMetricsFilteringView()) +} diff --git a/flow/otel_metrics/peerdb_gauges/gauges.go b/flow/otel_metrics/peerdb_gauges/gauges.go index 6f8f4f0c54..767aac0945 100644 --- a/flow/otel_metrics/peerdb_gauges/gauges.go +++ b/flow/otel_metrics/peerdb_gauges/gauges.go @@ -2,7 +2,6 @@ package peerdb_gauges import ( "github.com/PeerDB-io/peer-flow/otel_metrics" - "github.com/PeerDB-io/peer-flow/peerdbenv" ) const ( @@ -20,5 +19,5 @@ type SlotMetricGauges struct { } func BuildGaugeName(baseGaugeName string) string { - return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "") + baseGaugeName + return otel_metrics.GetPeerDBOtelMetricsNamespace() + baseGaugeName } From 37546bba65cd8d9fa6ced99893137b4958903682 Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Wed, 30 Oct 2024 09:59:00 +0530 Subject: [PATCH 02/11] refactor: update temporal metrics list env (#2204) --- flow/otel_metrics/env.go | 4 ++-- flow/otel_metrics/otel_manager.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flow/otel_metrics/env.go b/flow/otel_metrics/env.go index f388bf664d..81b5d0c3ea 100644 --- a/flow/otel_metrics/env.go +++ b/flow/otel_metrics/env.go @@ -6,6 +6,6 @@ func GetPeerDBOtelMetricsNamespace() string { return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "") } -func GetPeerDBOtelMetricsExportListEnv() string { - return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_EXPORT_LIST", "") +func GetPeerDBOtelTemporalMetricsExportListEnv() string { + return peerdbenv.GetEnvString("PEERDB_OTEL_TEMPORAL_METRICS_EXPORT_LIST", "") } diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go index f69a4aa30f..becf13a16f 100644 --- a/flow/otel_metrics/otel_manager.go +++ b/flow/otel_metrics/otel_manager.go @@ -50,7 +50,7 @@ func setupGrpcOtelMetricsExporter() (sdkmetric.Exporter, error) { } func temporalMetricsFilteringView() sdkmetric.View { - exportListString := GetPeerDBOtelMetricsExportListEnv() + exportListString := GetPeerDBOtelTemporalMetricsExportListEnv() slog.Info("Found export list for temporal metrics", slog.String("exportList", exportListString)) // Special case for exporting all metrics if exportListString == "__ALL__" { From 159d8686c8bf03bfd8d210831c621840b2d17fb5 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 30 Oct 2024 19:23:23 +0530 Subject: [PATCH 03/11] reduce setup srcconn spam and hushWarn for message types (#2205) --- flow/activities/flowable_core.go | 12 +++++++++-- flow/connectors/postgres/cdc.go | 35 ++++++++++++++++++-------------- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 66040396ce..db04efea30 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -53,6 +53,9 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, var none TPull logger := activity.GetLogger(ctx) attempt := 0 + waitInterval := time.Second + // try for 5 minutes, once per second + // after that, try indefinitely every minute for { a.CdcCacheRw.RLock() entry, ok := a.CdcCache[sessionID] @@ -63,7 +66,7 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, } return none, fmt.Errorf("expected %s, cache held %T", reflect.TypeFor[TPull]().Name(), entry.connector) } - activity.RecordHeartbeat(ctx, "wait another second for source connector") + activity.RecordHeartbeat(ctx, fmt.Sprintf("wait %s for source connector", waitInterval)) attempt += 1 if attempt > 2 { logger.Info("waiting on source connector setup", @@ -72,7 +75,12 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, if err := ctx.Err(); err != nil { return none, err } - time.Sleep(time.Second) + time.Sleep(waitInterval) + if attempt == 300 { + logger.Info("source connector not setup in time, transition to slow wait", + slog.String("sessionID", sessionID)) + waitInterval = time.Minute + } } } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 91eaf3eba7..a355cfa00e 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -40,8 +40,9 @@ type PostgresCDCSource struct { childToParentRelIDMapping map[uint32]uint32 // for storing schema delta audit logs to catalog - catalogPool *pgxpool.Pool - flowJobName string + catalogPool *pgxpool.Pool + hushWarnUnhandledMessageType map[pglogrepl.MessageType]struct{} + flowJobName string } type PostgresCDCConfig struct { @@ -59,18 +60,19 @@ type PostgresCDCConfig struct { // Create a new PostgresCDCSource func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource { return &PostgresCDCSource{ - PostgresConnector: c, - srcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, - tableNameMapping: cdcConfig.TableNameMapping, - tableNameSchemaMapping: cdcConfig.TableNameSchemaMapping, - relationMessageMapping: cdcConfig.RelationMessageMapping, - slot: cdcConfig.Slot, - publication: cdcConfig.Publication, - childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, - typeMap: pgtype.NewMap(), - commitLock: nil, - catalogPool: cdcConfig.CatalogPool, - flowJobName: cdcConfig.FlowJobName, + PostgresConnector: c, + srcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, + tableNameMapping: cdcConfig.TableNameMapping, + tableNameSchemaMapping: cdcConfig.TableNameSchemaMapping, + relationMessageMapping: cdcConfig.RelationMessageMapping, + slot: cdcConfig.Slot, + publication: cdcConfig.Publication, + childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, + typeMap: pgtype.NewMap(), + commitLock: nil, + catalogPool: cdcConfig.CatalogPool, + flowJobName: cdcConfig.FlowJobName, + hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}), } } @@ -678,7 +680,10 @@ func processMessage[Items model.Items]( }, nil default: - logger.Debug(fmt.Sprintf("%T not supported", msg)) + if _, ok := p.hushWarnUnhandledMessageType[msg.Type()]; !ok { + logger.Warn(fmt.Sprintf("Unhandled message type: %T", msg)) + p.hushWarnUnhandledMessageType[msg.Type()] = struct{}{} + } } return nil, nil From 7d635029360001a5c597c19cd7cb0da7fefe1fda Mon Sep 17 00:00:00 2001 From: Kunal Gupta <39487888+iamKunalGupta@users.noreply.github.com> Date: Wed, 30 Oct 2024 20:45:49 +0530 Subject: [PATCH 04/11] feat: add request logging interceptor for API (#2203) Set `PEERDB_API_REQUEST_LOGGING_ENABLED` = true to enable request logging. Separating it from the oauth interceptor incase oauth is disabled and we wish to enable request logging --- flow/cmd/api.go | 15 ++++-- flow/middleware/logging.go | 31 ++++++++++++ .../middleware.go => middleware/oauth.go} | 48 ++++++++----------- flow/peerdbenv/config.go | 10 ++++ flow/peerdbenv/oauth.go | 6 ++- 5 files changed, 77 insertions(+), 33 deletions(-) create mode 100644 flow/middleware/logging.go rename flow/{auth/middleware.go => middleware/oauth.go} (81%) diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 58e6beac0f..ca225e4292 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -23,8 +23,8 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" - "github.com/PeerDB-io/peer-flow/auth" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/middleware" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -213,14 +213,23 @@ func APIMain(ctx context.Context, args *APIServerParams) error { return fmt.Errorf("unable to create Temporal client: %w", err) } - options, err := auth.AuthGrpcMiddleware([]string{ + authGrpcMiddleware, err := middleware.AuthGrpcMiddleware([]string{ grpc_health_v1.Health_Check_FullMethodName, grpc_health_v1.Health_Watch_FullMethodName, }) if err != nil { return err } - grpcServer := grpc.NewServer(options...) + + requestLoggingMiddleware := middleware.RequestLoggingMiddleWare() + + // Interceptors are executed in the order they are passed to, so unauthorized requests are not logged + interceptors := grpc.ChainUnaryInterceptor( + authGrpcMiddleware, + requestLoggingMiddleware, + ) + + grpcServer := grpc.NewServer(interceptors) catalogPool, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { diff --git a/flow/middleware/logging.go b/flow/middleware/logging.go new file mode 100644 index 0000000000..51932700fe --- /dev/null +++ b/flow/middleware/logging.go @@ -0,0 +1,31 @@ +package middleware + +import ( + "context" + "log/slog" + + "google.golang.org/grpc" + + "github.com/PeerDB-io/peer-flow/peerdbenv" +) + +func RequestLoggingMiddleWare() grpc.UnaryServerInterceptor { + if !peerdbenv.PeerDBRAPIRequestLoggingEnabled() { + slog.Info("Request Logging Interceptor is disabled") + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + return handler(ctx, req) + } + } + slog.Info("Setting up request logging middleware") + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + slog.Info("Received gRPC request", slog.String("method", info.FullMethod)) + + resp, err := handler(ctx, req) + if err != nil { + slog.Error("gRPC request failed", slog.String("method", info.FullMethod), slog.Any("error", err)) + } else { + slog.Info("gRPC request completed successfully", slog.String("method", info.FullMethod)) + } + return resp, err + } +} diff --git a/flow/auth/middleware.go b/flow/middleware/oauth.go similarity index 81% rename from flow/auth/middleware.go rename to flow/middleware/oauth.go index bb3ee34da5..52bbc03672 100644 --- a/flow/auth/middleware.go +++ b/flow/middleware/oauth.go @@ -1,4 +1,4 @@ -package auth +package middleware import ( "context" @@ -34,7 +34,7 @@ type identityProvider struct { issuer string } -func AuthGrpcMiddleware(unauthenticatedMethods []string) ([]grpc.ServerOption, error) { +func AuthGrpcMiddleware(unauthenticatedMethods []string) (grpc.UnaryServerInterceptor, error) { oauthConfig := peerdbenv.GetPeerDBOAuthConfig() oauthJwtClaims := map[string]string{} if oauthConfig.OAuthJwtClaimKey != "" { @@ -57,7 +57,9 @@ func AuthGrpcMiddleware(unauthenticatedMethods []string) ([]grpc.ServerOption, e slog.Warn("authentication is disabled") - return nil, nil + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + return handler(ctx, req) + }, nil } if err != nil { @@ -68,36 +70,24 @@ func AuthGrpcMiddleware(unauthenticatedMethods []string) ([]grpc.ServerOption, e for _, method := range unauthenticatedMethods { unauthenticatedMethodsMap[method] = struct{}{} } - return []grpc.ServerOption{ - grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - slog.Info("Received gRPC request", slog.String("method", info.FullMethod)) - - if _, unauthorized := unauthenticatedMethodsMap[info.FullMethod]; !unauthorized { - var authHeader string - authHeaders := metadata.ValueFromIncomingContext(ctx, "Authorization") - if len(authHeaders) == 1 { - authHeader = authHeaders[0] - } else if len(authHeaders) > 1 { - slog.Warn("Multiple Authorization headers supplied, request rejected", slog.String("method", info.FullMethod)) - return nil, status.Errorf(codes.Unauthenticated, "multiple Authorization headers supplied, request rejected") - } - _, err := validateRequestToken(authHeader, cfg.OauthJwtCustomClaims, ip...) - if err != nil { - slog.Debug("Failed to validate request token", slog.String("method", info.FullMethod), slog.Any("error", err)) - return nil, status.Errorf(codes.Unauthenticated, "%s", err.Error()) - } + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + if _, unauthorized := unauthenticatedMethodsMap[info.FullMethod]; !unauthorized { + var authHeader string + authHeaders := metadata.ValueFromIncomingContext(ctx, "Authorization") + if len(authHeaders) == 1 { + authHeader = authHeaders[0] + } else if len(authHeaders) > 1 { + slog.Warn("Multiple Authorization headers supplied, request rejected", slog.String("method", info.FullMethod)) + return nil, status.Errorf(codes.Unauthenticated, "multiple Authorization headers supplied, request rejected") } - - resp, err := handler(ctx, req) - + _, err := validateRequestToken(authHeader, cfg.OauthJwtCustomClaims, ip...) if err != nil { - slog.Error("gRPC request failed", slog.String("method", info.FullMethod), slog.Any("error", err)) - } else { - slog.Info("gRPC request completed successfully", slog.String("method", info.FullMethod)) + slog.Debug("Failed to validate request token", slog.String("method", info.FullMethod), slog.Any("error", err)) + return nil, status.Errorf(codes.Unauthenticated, "%s", err.Error()) } + } - return resp, err - }), + return handler(ctx, req) }, nil } diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index ecae67037f..e033b87195 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log/slog" + "strconv" "strings" "time" @@ -156,3 +157,12 @@ func PeerDBGetIncidentIoUrl() string { func PeerDBGetIncidentIoToken() string { return GetEnvString("PEERDB_INCIDENTIO_TOKEN", "") } + +func PeerDBRAPIRequestLoggingEnabled() bool { + requestLoggingEnabled, err := strconv.ParseBool(GetEnvString("PEERDB_API_REQUEST_LOGGING_ENABLED", "false")) + if err != nil { + slog.Error("failed to parse PEERDB_API_REQUEST_LOGGING_ENABLED to bool", "error", err) + return false + } + return requestLoggingEnabled +} diff --git a/flow/peerdbenv/oauth.go b/flow/peerdbenv/oauth.go index cd76b30193..54b2f04425 100644 --- a/flow/peerdbenv/oauth.go +++ b/flow/peerdbenv/oauth.go @@ -1,6 +1,9 @@ package peerdbenv -import "strconv" +import ( + "log/slog" + "strconv" +) type PeerDBOAuthConfig struct { // there can be more complex use cases where domain != issuer, but we handle them later if required @@ -18,6 +21,7 @@ func GetPeerDBOAuthConfig() PeerDBOAuthConfig { oauthDiscoveryEnabledString := GetEnvString("PEERDB_OAUTH_DISCOVERY_ENABLED", "false") oauthDiscoveryEnabled, err := strconv.ParseBool(oauthDiscoveryEnabledString) if err != nil { + slog.Error("failed to parse PEERDB_OAUTH_DISCOVERY_ENABLED to bool", "error", err) oauthDiscoveryEnabled = false } oauthKeysetJson := GetEnvString("PEERDB_OAUTH_KEYSET_JSON", "") From 9b4d83796f1b3faca39994ad831b1b9e4004c608 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 30 Oct 2024 18:56:47 +0000 Subject: [PATCH 05/11] improve logs pagination (#2202) using LIMIT/OFFSET is inaccurate when new logs come it while logs being viewed this makes a minor improvement by instead being based on records before/after an id from previous page, which is how I'd like to also add pagination to sync batches in mirror status --- flow/cmd/mirror_status.go | 66 ++++++++++++++--- protos/route.proto | 4 + ui/app/mirror-logs/table.tsx | 52 +------------ ui/app/mirrors/errors/[mirrorName]/page.tsx | 50 +------------ ui/components/LogsTable.tsx | 82 ++++++++++++++++----- 5 files changed, 128 insertions(+), 126 deletions(-) diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 70efa7597b..da68b6457c 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "log/slog" + "slices" "strings" "time" @@ -581,8 +582,8 @@ func (h *FlowRequestHandler) ListMirrorLogs( ctx context.Context, req *protos.ListMirrorLogsRequest, ) (*protos.ListMirrorLogsResponse, error) { - whereExprs := make([]string, 0, 2) - whereArgs := make([]interface{}, 0, 2) + whereExprs := make([]string, 0, 3) + whereArgs := make([]any, 0, 4) if req.FlowJobName != "" { whereArgs = append(whereArgs, req.FlowJobName) whereExprs = append(whereExprs, "position($1 in flow_name) > 0") @@ -593,23 +594,47 @@ func (h *FlowRequestHandler) ListMirrorLogs( whereExprs = append(whereExprs, fmt.Sprintf("error_type = $%d", len(whereArgs))) } + // count query doesn't want paging + countWhereArgs := slices.Clone(whereArgs) + var countWhereClause string + if len(whereExprs) != 0 { + countWhereClause = " WHERE " + strings.Join(whereExprs, " AND ") + } + + sortOrderBy := "desc" + if req.BeforeId != 0 && req.AfterId != 0 { + if req.BeforeId != -1 { + whereArgs = append(whereArgs, req.BeforeId) + whereExprs = append(whereExprs, fmt.Sprintf("id < $%d", len(whereArgs))) + } else if req.AfterId != -1 { + whereArgs = append(whereArgs, req.AfterId) + whereExprs = append(whereExprs, fmt.Sprintf("id > $%d", len(whereArgs))) + sortOrderBy = "" + } + } + var whereClause string if len(whereExprs) != 0 { whereClause = " WHERE " + strings.Join(whereExprs, " AND ") } - skip := (req.Page - 1) * req.NumPerPage - rows, err := h.pool.Query(ctx, fmt.Sprintf(`select flow_name, error_message, error_type, error_timestamp - from peerdb_stats.flow_errors %s - order by error_timestamp desc - limit %d offset %d`, whereClause, req.NumPerPage, skip), whereArgs...) + // page is deprecated + var offsetClause string + if req.Page != 0 { + offsetClause = fmt.Sprintf(" offset %d", (req.Page-1)*req.NumPerPage) + } + + rows, err := h.pool.Query(ctx, fmt.Sprintf(`select id, flow_name, error_message, error_type, error_timestamp + from peerdb_stats.flow_errors%s + order by id %s + limit %d%s`, whereClause, sortOrderBy, req.NumPerPage, offsetClause), whereArgs...) if err != nil { return nil, err } mirrorErrors, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.MirrorLog, error) { var log protos.MirrorLog var errorTimestamp time.Time - if err := rows.Scan(&log.FlowName, &log.ErrorMessage, &log.ErrorType, &errorTimestamp); err != nil { + if err := rows.Scan(&log.Id, &log.FlowName, &log.ErrorMessage, &log.ErrorType, &errorTimestamp); err != nil { return nil, err } log.ErrorTimestamp = float64(errorTimestamp.UnixMilli()) @@ -618,14 +643,37 @@ func (h *FlowRequestHandler) ListMirrorLogs( if err != nil { return nil, err } + if sortOrderBy == "" { + slices.Reverse(mirrorErrors) + } var total int32 - if err := h.pool.QueryRow(ctx, "select count(*) from peerdb_stats.flow_errors"+whereClause, whereArgs...).Scan(&total); err != nil { + var rowsBehind int32 + if len(mirrorErrors) > 0 { + firstId := mirrorErrors[0].Id + countWhereArgs = append(countWhereArgs, firstId) + if err := h.pool.QueryRow( + ctx, + fmt.Sprintf("select count(*), count(*) filter (where id > $%d) from peerdb_stats.flow_errors%s", + len(countWhereArgs), countWhereClause), + countWhereArgs..., + ).Scan(&total, &rowsBehind); err != nil { + return nil, err + } + } else if err := h.pool.QueryRow( + ctx, "select count(*) from peerdb_stats.flow_errors"+countWhereClause, countWhereArgs..., + ).Scan(&total); err != nil { return nil, err } + page := req.Page + if page == 0 { + page = rowsBehind/req.NumPerPage + 1 + } + return &protos.ListMirrorLogsResponse{ Errors: mirrorErrors, Total: total, + Page: page, }, nil } diff --git a/protos/route.proto b/protos/route.proto index 9b85da6f47..a729f88ac4 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -354,16 +354,20 @@ message MirrorLog { string error_message = 2; string error_type = 3; double error_timestamp = 4; + int32 id = 5; } message ListMirrorLogsRequest { string flow_job_name = 1; string level = 2; int32 page = 3; int32 num_per_page = 4; + int32 before_id = 5; + int32 after_id = 6; } message ListMirrorLogsResponse { repeated MirrorLog errors = 1; int32 total = 2; + int32 page = 3; } message ValidateCDCMirrorResponse{ diff --git a/ui/app/mirror-logs/table.tsx b/ui/app/mirror-logs/table.tsx index fc9206a362..4d14c80826 100644 --- a/ui/app/mirror-logs/table.tsx +++ b/ui/app/mirror-logs/table.tsx @@ -1,14 +1,8 @@ 'use client'; import LogsTable from '@/components/LogsTable'; -import { - ListMirrorLogsRequest, - ListMirrorLogsResponse, - ListMirrorNamesResponse, - MirrorLog, -} from '@/grpc_generated/route'; +import { ListMirrorNamesResponse } from '@/grpc_generated/route'; import { ProgressCircle } from '@/lib/ProgressCircle'; -import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; import 'react-toastify/dist/ReactToastify.css'; import useSWR from 'swr'; @@ -16,7 +10,6 @@ import { useLocalStorage } from 'usehooks-ts'; import { fetcher } from '../utils/swr'; export default function LogsView() { - const [logs, setLogs] = useState([]); const [mirrorName, setMirrorName] = useLocalStorage( 'peerdbMirrorNameFilterForLogs', '' @@ -25,45 +18,9 @@ export default function LogsView() { 'peerdbLogTypeFilterForLogs', 'all' ); - const [currentPage, setCurrentPage] = useState(1); - const [totalPages, setTotalPages] = useState(1); const { data: mirrors }: { data: ListMirrorNamesResponse; error: any } = useSWR('/api/v1/mirrors/names', fetcher); - useEffect(() => { - setCurrentPage(1); - }, [mirrorName]); - - useEffect(() => { - const req: ListMirrorLogsRequest = { - level: logLevel, - flowJobName: mirrorName, - page: currentPage, - numPerPage: 15, - }; - - const fetchData = async () => { - try { - const response = await fetch('/api/v1/mirrors/logs', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - cache: 'no-store', - body: JSON.stringify(req), - }); - const data: ListMirrorLogsResponse = await response.json(); - const numPages = Math.ceil(data.total / req.numPerPage); - setLogs(data.errors); - setTotalPages(numPages); - } catch (error) { - console.error('Error fetching mirror logs:', error); - } - }; - - fetchData(); - }, [currentPage, mirrorName, logLevel]); - if (!mirrors) { return ; } @@ -107,12 +64,7 @@ export default function LogsView() { /> - + ); } diff --git a/ui/app/mirrors/errors/[mirrorName]/page.tsx b/ui/app/mirrors/errors/[mirrorName]/page.tsx index 42c36c336d..af8acfb66d 100644 --- a/ui/app/mirrors/errors/[mirrorName]/page.tsx +++ b/ui/app/mirrors/errors/[mirrorName]/page.tsx @@ -1,56 +1,13 @@ 'use client'; import LogsTable from '@/components/LogsTable'; -import { - ListMirrorLogsRequest, - ListMirrorLogsResponse, - MirrorLog, -} from '@/grpc_generated/route'; import { Label } from '@/lib/Label'; import { useParams } from 'next/navigation'; -import { useEffect, useState } from 'react'; import { ToastContainer } from 'react-toastify'; import 'react-toastify/dist/ReactToastify.css'; export default function MirrorError() { const params = useParams<{ mirrorName: string }>(); - const [mirrorErrors, setMirrorErrors] = useState([]); - const [currentPage, setCurrentPage] = useState(1); - const [totalPages, setTotalPages] = useState(1); - - useEffect(() => { - setCurrentPage(1); - }, [params.mirrorName]); - - useEffect(() => { - const req: ListMirrorLogsRequest = { - flowJobName: params.mirrorName, - page: currentPage, - numPerPage: 10, - level: 'all', - }; - - const fetchData = async () => { - try { - const response = await fetch('/api/v1/mirrors/logs', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - cache: 'no-store', - body: JSON.stringify(req), - }); - const data: ListMirrorLogsResponse = await response.json(); - const numPages = Math.ceil(data.total / req.numPerPage); - setMirrorErrors(data.errors); - setTotalPages(numPages); - } catch (error) { - console.error('Error fetching mirror errors:', error); - } - }; - - fetchData(); - }, [currentPage, params.mirrorName]); return ( <> @@ -72,10 +29,9 @@ export default function MirrorError() { diff --git a/ui/components/LogsTable.tsx b/ui/components/LogsTable.tsx index 7d3486158a..c340044b44 100644 --- a/ui/components/LogsTable.tsx +++ b/ui/components/LogsTable.tsx @@ -1,9 +1,14 @@ import TimeLabel from '@/components/TimeComponent'; -import { MirrorLog } from '@/grpc_generated/route'; +import { + ListMirrorLogsRequest, + ListMirrorLogsResponse, + MirrorLog, +} from '@/grpc_generated/route'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { Table, TableCell, TableRow } from '@/lib/Table'; +import { useCallback, useEffect, useState } from 'react'; import 'react-toastify/dist/ReactToastify.css'; const colorForErrorType = (errorType: string) => { @@ -25,26 +30,63 @@ const extractFromCloneName = (mirrorOrCloneName: string) => { }; export default function LogsTable({ - logs, - currentPage, - totalPages, - setCurrentPage, + numPerPage, + mirrorName, + logLevel, }: { - logs: MirrorLog[]; - currentPage: number; - totalPages: number; - setCurrentPage: (page: number) => void; + numPerPage: number; + mirrorName: string; + logLevel: string; }) { - const handleNextPage = () => { - if (currentPage < totalPages) { - setCurrentPage(currentPage + 1); + const [logs, setLogs] = useState([]); + const [currentPage, setCurrentPage] = useState(1); + const [totalPages, setTotalPages] = useState(1); + const [[beforeId, afterId], setBeforeAfterId] = useState([-1, -1]); + const nextPage = useCallback(() => { + if (logs.length === 0) { + setBeforeAfterId([-1, -1]); } - }; - const handlePrevPage = () => { - if (currentPage > 1) { - setCurrentPage(currentPage - 1); + setBeforeAfterId([logs[logs.length - 1].id, -1]); + }, [logs]); + const prevPage = useCallback(() => { + if (logs.length === 0 || currentPage < 3) { + setBeforeAfterId([-1, -1]); } - }; + setBeforeAfterId([-1, logs[0].id]); + }, [logs, currentPage]); + + useEffect(() => { + const fetchData = async () => { + const req: ListMirrorLogsRequest = { + level: logLevel, + flowJobName: mirrorName, + beforeId, + afterId, + numPerPage, + page: 0, // deprecated + }; + + try { + const response = await fetch('/api/v1/mirrors/logs', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + cache: 'no-store', + body: JSON.stringify(req), + }); + const data: ListMirrorLogsResponse = await response.json(); + const numPages = Math.ceil(data.total / req.numPerPage); + setLogs(data.errors); + setTotalPages(numPages); + setCurrentPage(data.page); + } catch (error) { + console.error('Error fetching mirror logs:', error); + } + }; + + fetchData(); + }, [mirrorName, logLevel, numPerPage, afterId, beforeId]); return ( - - @@ -82,7 +124,7 @@ export default function LogsTable({ }} > {logs.map((log, idx) => ( - + Date: Thu, 31 Oct 2024 19:31:04 +0530 Subject: [PATCH 06/11] drop flow refactor (#2208) move flow entries removal to an activity, and make it before dropping source and destination as that can fail and the workflow will be cancelled --- flow/activities/flowable.go | 31 +++++++++++ flow/cmd/handler.go | 103 +++++++++++++----------------------- flow/workflows/drop_flow.go | 85 ++++++++++++++++++----------- protos/flow.proto | 4 +- 4 files changed, 125 insertions(+), 98 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index cacc0ba881..cc09bae0d7 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -1065,3 +1065,34 @@ func (a *FlowableActivity) RemoveTablesFromCatalog( return err } + +func (a *FlowableActivity) RemoveFlowEntryFromCatalog(ctx context.Context, flowName string) error { + logger := log.With(activity.GetLogger(ctx), + slog.String(string(shared.FlowNameKey), flowName)) + tx, err := a.CatalogPool.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction to remove flow entries from catalog: %w", err) + } + defer shared.RollbackTx(tx, slog.Default()) + + if _, err := tx.Exec(ctx, "DELETE FROM table_schema_mapping WHERE flow_name=$1", flowName); err != nil { + return fmt.Errorf("unable to clear table_schema_mapping in catalog: %w", err) + } + + ct, err := tx.Exec(ctx, "DELETE FROM flows WHERE name=$1", flowName) + if err != nil { + return fmt.Errorf("unable to remove flow entry in catalog: %w", err) + } + if ct.RowsAffected() == 0 { + logger.Warn("flow entry not found in catalog, 0 records deleted") + } else { + logger.Info("flow entries removed from catalog", + slog.Int("rowsAffected", int(ct.RowsAffected()))) + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction to remove flow entries from catalog: %w", err) + } + + return nil +} diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index d9f5c27d9c..8b30331ae8 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -175,31 +175,6 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog( return shared.UpdateCDCConfigInCatalog(ctx, h.pool, slog.Default(), cfg) } -func (h *FlowRequestHandler) removeFlowEntryInCatalog( - ctx context.Context, - flowName string, -) error { - tx, err := h.pool.Begin(ctx) - if err != nil { - return fmt.Errorf("unable to begin tx to remove flow entry in catalog: %w", err) - } - defer shared.RollbackTx(tx, slog.Default()) - - if _, err := tx.Exec(ctx, "DELETE FROM table_schema_mapping WHERE flow_name=$1", flowName); err != nil { - return fmt.Errorf("unable to clear table_schema_mapping to remove flow entry in catalog: %w", err) - } - - if _, err := tx.Exec(ctx, "DELETE FROM flows WHERE name=$1", flowName); err != nil { - return fmt.Errorf("unable to remove flow entry in catalog: %w", err) - } - - if err := tx.Commit(ctx); err != nil { - return fmt.Errorf("unable to commit remove flow entry in catalog: %w", err) - } - - return nil -} - func (h *FlowRequestHandler) CreateQRepFlow( ctx context.Context, req *protos.CreateQRepFlowRequest, ) (*protos.CreateQRepFlowResponse, error) { @@ -295,56 +270,52 @@ func (h *FlowRequestHandler) shutdownFlow( if err != nil { slog.Error("unable to check if workflow is cdc", logs, slog.Any("error", err)) return fmt.Errorf("unable to determine if workflow is cdc: %w", err) - } else if isCdc { - cdcConfig, err := h.getFlowConfigFromCatalog(ctx, flowJobName) + } + var cdcConfig *protos.FlowConnectionConfigs + if isCdc { + cdcConfig, err = h.getFlowConfigFromCatalog(ctx, flowJobName) if err != nil { slog.Error("unable to get cdc config from catalog", logs, slog.Any("error", err)) return fmt.Errorf("unable to get cdc config from catalog: %w", err) } - workflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New()) - workflowOptions := client.StartWorkflowOptions{ - ID: workflowID, - TaskQueue: h.peerflowTaskQueueID, - TypedSearchAttributes: shared.NewSearchAttributes(flowJobName), - } + } + dropFlowWorkflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New()) + workflowOptions := client.StartWorkflowOptions{ + ID: dropFlowWorkflowID, + TaskQueue: h.peerflowTaskQueueID, + TypedSearchAttributes: shared.NewSearchAttributes(flowJobName), + } - dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, - peerflow.DropFlowWorkflow, &protos.DropFlowInput{ - FlowJobName: flowJobName, - SourcePeerName: cdcConfig.SourceName, - DestinationPeerName: cdcConfig.DestinationName, - DropFlowStats: deleteStats, - }) - if err != nil { - slog.Error("unable to start DropFlow workflow", logs, slog.Any("error", err)) - return fmt.Errorf("unable to start DropFlow workflow: %w", err) - } + dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, + peerflow.DropFlowWorkflow, &protos.DropFlowInput{ + FlowJobName: flowJobName, + DropFlowStats: deleteStats, + FlowConnectionConfigs: cdcConfig, + }) + if err != nil { + slog.Error("unable to start DropFlow workflow", logs, slog.Any("error", err)) + return fmt.Errorf("unable to start DropFlow workflow: %w", err) + } - cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) - defer cancel() + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() - errChan := make(chan error, 1) - go func() { - errChan <- dropFlowHandle.Get(cancelCtx, nil) - }() + errChan := make(chan error, 1) + go func() { + errChan <- dropFlowHandle.Get(cancelCtx, nil) + }() - select { - case err := <-errChan: - if err != nil { - slog.Error("DropFlow workflow did not execute successfully", logs, slog.Any("error", err)) - return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) - } - case <-time.After(5 * time.Minute): - if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil { - slog.Error("unable to wait for DropFlow workflow to close", logs, slog.Any("error", err)) - return fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err) - } + select { + case err := <-errChan: + if err != nil { + slog.Error("DropFlow workflow did not execute successfully", logs, slog.Any("error", err)) + return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) + } + case <-time.After(5 * time.Minute): + if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil { + slog.Error("unable to wait for DropFlow workflow to close", logs, slog.Any("error", err)) + return fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err) } - } - - if err := h.removeFlowEntryInCatalog(ctx, flowJobName); err != nil { - slog.Error("unable to remove flow job entry", logs, slog.Any("error", err)) - return err } return nil diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index fe822dd66f..51bf0091a1 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -12,35 +12,27 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error { - workflow.GetLogger(ctx).Info("performing cleanup for flow", slog.String(string(shared.FlowNameKey), config.FlowJobName)) - +func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) - ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) ctx = workflow.WithDataConverter(ctx, converter.NewCompositeDataConverter(converter.NewJSONPayloadConverter())) - dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 5 * time.Minute, - HeartbeatTimeout: 1 * time.Minute, - }) - var sourceError, destinationError error var sourceOk, destinationOk, canceled bool - selector := workflow.NewNamedSelector(ctx, config.FlowJobName+"-drop") + selector := workflow.NewNamedSelector(ctx, input.FlowJobName+"-drop") selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { canceled = true }) - var dropSource, dropDestination, dropStats func(f workflow.Future) + var dropSource, dropDestination func(f workflow.Future) dropSource = func(f workflow.Future) { sourceError = f.Get(ctx, nil) sourceOk = sourceError == nil if !sourceOk { dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, &protos.DropFlowActivityInput{ - FlowJobName: config.FlowJobName, - PeerName: config.SourcePeerName, + FlowJobName: input.FlowJobName, + PeerName: input.FlowConnectionConfigs.SourceName, }) selector.AddFuture(dropSourceFuture, dropSource) _ = workflow.Sleep(ctx, time.Second) @@ -51,34 +43,25 @@ func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error destinationOk = destinationError == nil if !destinationOk { dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, &protos.DropFlowActivityInput{ - FlowJobName: config.FlowJobName, - PeerName: config.DestinationPeerName, + FlowJobName: input.FlowJobName, + PeerName: input.FlowConnectionConfigs.DestinationName, }) selector.AddFuture(dropDestinationFuture, dropDestination) _ = workflow.Sleep(ctx, time.Second) } } - dropStats = func(f workflow.Future) { - statsError := f.Get(dropStatsCtx, nil) - if statsError != nil { - // not fatal - workflow.GetLogger(ctx).Warn("failed to delete mirror stats", slog.Any("error", statsError)) - } - } + dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, &protos.DropFlowActivityInput{ - FlowJobName: config.FlowJobName, - PeerName: config.SourcePeerName, + FlowJobName: input.FlowJobName, + PeerName: input.FlowConnectionConfigs.SourceName, }) selector.AddFuture(dropSourceFuture, dropSource) dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, &protos.DropFlowActivityInput{ - FlowJobName: config.FlowJobName, - PeerName: config.DestinationPeerName, + FlowJobName: input.FlowJobName, + PeerName: input.FlowConnectionConfigs.DestinationName, }) + selector.AddFuture(dropDestinationFuture, dropDestination) - if config.DropFlowStats { - dropStatsFuture := workflow.ExecuteActivity(dropStatsCtx, flowable.DeleteMirrorStats, config.FlowJobName) - selector.AddFuture(dropStatsFuture, dropStats) - } for { selector.Select(ctx) @@ -89,3 +72,45 @@ func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error } } } + +func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { + ctx = workflow.WithValue(ctx, shared.FlowNameKey, input.FlowJobName) + workflow.GetLogger(ctx).Info("performing cleanup for flow", + slog.String(string(shared.FlowNameKey), input.FlowJobName)) + + if input.FlowConnectionConfigs != nil && input.DropFlowStats { + dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Minute, + HeartbeatTimeout: 1 * time.Minute, + }) + dropStatsFuture := workflow.ExecuteActivity(dropStatsCtx, + flowable.DeleteMirrorStats, input.FlowJobName) + err := dropStatsFuture.Get(dropStatsCtx, nil) + if err != nil { + workflow.GetLogger(ctx).Error("failed to delete mirror stats", slog.Any("error", err)) + return err + } + } + + removeFlowEntriesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Minute, + }) + removeFromCatalogFuture := workflow.ExecuteActivity(removeFlowEntriesCtx, + flowable.RemoveFlowEntryFromCatalog, input.FlowJobName) + err := removeFromCatalogFuture.Get(ctx, nil) + if err != nil { + workflow.GetLogger(ctx).Error("failed to remove flow entries from catalog", slog.Any("error", err)) + return err + } + + if input.FlowConnectionConfigs != nil { + err := executeCDCDropActivities(ctx, input) + if err != nil { + workflow.GetLogger(ctx).Error("failed to drop CDC flow", slog.Any("error", err)) + return err + } + workflow.GetLogger(ctx).Info("CDC flow dropped successfully") + } + + return nil +} diff --git a/protos/flow.proto b/protos/flow.proto index 7e24cfc528..d1681fd8d5 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -333,10 +333,10 @@ message QRepParitionResult { } message DropFlowInput { + reserved 2,3; string flow_job_name = 1; - string source_peer_name = 2; - string destination_peer_name = 3; bool drop_flow_stats = 4; + FlowConnectionConfigs flow_connection_configs = 5; } message TableSchemaDelta { From 69e355f27a246dad1d0a129cd5a77bf7efe41baf Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Fri, 1 Nov 2024 01:22:01 +0530 Subject: [PATCH 07/11] [clickhouse] handle numerics with >76 precision as string (#2209) --- flow/connectors/clickhouse/normalize.go | 16 +++++-- flow/datatypes/numeric.go | 6 ++- flow/e2e/clickhouse/clickhouse.go | 7 +-- flow/e2e/clickhouse/peer_flow_ch_test.go | 59 ++++++++++++++++++++++++ flow/e2e/test_utils.go | 24 ++++++++++ flow/model/qvalue/avro_converter.go | 10 ++++ 6 files changed, 110 insertions(+), 12 deletions(-) diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index c971c3f692..d65d61e9d7 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -77,6 +77,16 @@ func getColName(overrides map[string]string, name string) string { return name } +func getClickhouseTypeForNumericColumn(column *protos.FieldDescription) string { + rawPrecision, _ := datatypes.ParseNumericTypmod(column.TypeModifier) + if rawPrecision > datatypes.PeerDBClickHouseMaxPrecision { + return "String" + } else { + precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{}) + return fmt.Sprintf("Decimal(%d, %d)", precision, scale) + } +} + func generateCreateTableSQLForNormalizedTable( ctx context.Context, config *protos.SetupNormalizedTableBatchInput, @@ -129,8 +139,7 @@ func generateCreateTableSQLForNormalizedTable( if clickHouseType == "" { if colType == qvalue.QValueKindNumeric { - precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{}) - clickHouseType = fmt.Sprintf("Decimal(%d, %d)", precision, scale) + clickHouseType = getClickhouseTypeForNumericColumn(column) } else { var err error clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE) @@ -323,8 +332,7 @@ func (c *ClickHouseConnector) NormalizeRecords( colSelector.WriteString(fmt.Sprintf("`%s`,", dstColName)) if clickHouseType == "" { if colType == qvalue.QValueKindNumeric { - precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{}) - clickHouseType = fmt.Sprintf("Decimal(%d, %d)", precision, scale) + clickHouseType = getClickhouseTypeForNumericColumn(column) } else { var err error clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE) diff --git a/flow/datatypes/numeric.go b/flow/datatypes/numeric.go index 6c57071484..56c1b17839 100644 --- a/flow/datatypes/numeric.go +++ b/flow/datatypes/numeric.go @@ -5,7 +5,9 @@ const ( PeerDBBigQueryScale = 20 PeerDBSnowflakeScale = 20 PeerDBClickHouseScale = 38 - VARHDRSZ = 4 + + PeerDBClickHouseMaxPrecision = 76 + VARHDRSZ = 4 ) type WarehouseNumericCompatibility interface { @@ -17,7 +19,7 @@ type WarehouseNumericCompatibility interface { type ClickHouseNumericCompatibility struct{} func (ClickHouseNumericCompatibility) MaxPrecision() int16 { - return 76 + return PeerDBClickHouseMaxPrecision } func (ClickHouseNumericCompatibility) MaxScale() int16 { diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index 891fe55365..e1eafd6b4b 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -2,7 +2,6 @@ package e2e_clickhouse import ( "context" - "errors" "fmt" "reflect" "strings" @@ -93,13 +92,9 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch return nil, err } - firstCol, _, _ := strings.Cut(cols, ",") - if firstCol == "" { - return nil, errors.New("no columns specified") - } rows, err := ch.Query( context.Background(), - fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY %s SETTINGS use_query_cache = false`, cols, table, firstCol), + fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY 1 SETTINGS use_query_cache = false`, cols, table), ) if err != nil { return nil, err diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 4dcee7feb5..3cf1f97597 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -4,15 +4,18 @@ import ( "context" "embed" "fmt" + "strings" "testing" "time" + "github.com/shopspring/decimal" "github.com/stretchr/testify/require" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) @@ -499,3 +502,59 @@ func (s ClickHouseSuite) Test_Weird_Table_And_Column() { env.Cancel() e2e.RequireEnvCanceled(s.t, env) } + +// large NUMERICs (precision >76) are mapped to String on CH, test +func (s ClickHouseSuite) Test_Large_Numeric() { + srcFullName := s.attachSchemaSuffix("lnumeric") + dstTableName := "lnumeric" + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s( + id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + c1 NUMERIC(76,0), + c2 NUMERIC(78,0) + ); + `, srcFullName)) + require.NoError(s.t, err) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2) VALUES(%s,%s);`, srcFullName, strings.Repeat("7", 76), strings.Repeat("9", 78))) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("clickhouse_test_large_numerics"), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + flowConnConfig.DoInitialSnapshot = true + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c1,c2", 1) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2) VALUES(%s,%s);`, srcFullName, strings.Repeat("7", 76), strings.Repeat("9", 78))) + require.NoError(s.t, err) + + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c1,c2", 2) + + rows, err := s.GetRows(dstTableName, "c1,c2") + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 2, "expected 2 rows") + for _, row := range rows.Records { + require.Len(s.t, row, 2, "expected 2 columns") + require.Equal(s.t, qvalue.QValueKindNumeric, row[0].Kind(), "expected NUMERIC(76,0) to be Decimal") + require.Equal(s.t, qvalue.QValueKindString, row[1].Kind(), "expected NUMERIC(78,0) to be String") + c1, ok := row[0].Value().(decimal.Decimal) + require.True(s.t, ok, "expected NUMERIC(76,0) to be Decimal") + require.Equal(s.t, strings.Repeat("7", 76), c1.String(), "expected NUMERIC(76,0) to be 7s") + c2, ok := row[1].Value().(string) + require.True(s.t, ok, "expected NUMERIC(78,0) to be String") + require.Equal(s.t, strings.Repeat("9", 78), c2, "expected NUMERIC(78,0) to be 9s") + } + + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 2eca0520c9..9dadc49852 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -168,6 +168,30 @@ func EnvWaitForEqualTablesWithNames( }) } +func EnvWaitForCount( + env WorkflowRun, + suite RowSource, + reason string, + dstTable string, + cols string, + expectedCount int, +) { + t := suite.T() + t.Helper() + + EnvWaitFor(t, env, 3*time.Minute, reason, func() bool { + t.Helper() + + rows, err := suite.GetRows(dstTable, cols) + if err != nil { + t.Log(err) + return false + } + + return len(rows.Records) == expectedCount + }) +} + func RequireEnvCanceled(t *testing.T, env WorkflowRun) { t.Helper() EnvWaitForFinished(t, env, time.Minute) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index c2382e5b0c..9738f46e8f 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -103,6 +103,10 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci } return "bytes", nil case QValueKindNumeric: + if targetDWH == protos.DBType_CLICKHOUSE && + precision > datatypes.PeerDBClickHouseMaxPrecision { + return "string", nil + } avroNumericPrecision, avroNumericScale := DetermineNumericSettingForDWH(precision, scale, targetDWH) return AvroSchemaNumeric{ Type: "bytes", @@ -454,6 +458,12 @@ func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} { return nil } + if c.TargetDWH == protos.DBType_CLICKHOUSE && + c.Precision > datatypes.PeerDBClickHouseMaxPrecision { + // no error returned + numStr, _ := c.processNullableUnion("string", num.String()) + return numStr + } rat := num.Rat() if c.Nullable { return goavro.Union("bytes.decimal", rat) From 93ec020d5098bb1fccaedeec0ffb12d204a69dcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 1 Nov 2024 14:10:59 +0000 Subject: [PATCH 08/11] Remove s3 path from mirror log based on ClickPipes feedback (#2212) --- flow/connectors/utils/avro/avro_writer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 9ce4064ed6..6f193be88b 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -222,8 +222,8 @@ func (p *peerDBOCFWriter) WriteRecordsToS3( }) if err != nil { s3Path := "s3://" + bucketName + "/" + key - logger.Error("failed to upload file", slog.Any("error", err), slog.Any("s3_path", s3Path)) - return nil, fmt.Errorf("failed to upload file to path %s: %w", s3Path, err) + logger.Error("failed to upload file", slog.Any("error", err), slog.String("s3_path", s3Path)) + return nil, fmt.Errorf("failed to upload file: %w", err) } if writeOcfError != nil { From f1a94ea94158e4c9ce1acae8d7ccfe67a5699ffe Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj <65964360+Amogh-Bharadwaj@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:19:58 +0530 Subject: [PATCH 09/11] API/GetColumns: fix foreign keys being considered pkeys (#2213) Foreign keys that were unique columns and not pkeys had their column exclusion disabled in the UI as the current query to fetch column data was counting them as primary keys. This PR fixes this ### QA ```sql -- Create a projects table CREATE TABLE projects ( id SERIAL PRIMARY KEY, application_id INT ); -- Create the applications table CREATE TABLE applications ( id INTEGER UNIQUE ); -- Create foreign key relationship ALTER TABLE projects ADD CONSTRAINT fk_application FOREIGN KEY (application_id) REFERENCES applications (id); ``` Before checking out to this PR, id in the applications table will be disabled in UI, post checkout - will be non-disabled --- flow/cmd/peer_data.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 9faf61c394..cb625978fe 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -297,19 +297,20 @@ func (h *FlowRequestHandler) GetColumns( defer peerConn.Close(ctx) rows, err := peerConn.Query(ctx, `SELECT - distinct attname AS column_name, - format_type(atttypid, atttypmod) AS data_type, - (attnum = ANY(conkey)) AS is_primary_key + DISTINCT attname AS column_name, + format_type(atttypid, atttypmod) AS data_type, + (pg_constraint.contype = 'p') AS is_primary_key FROM pg_attribute JOIN pg_class ON pg_attribute.attrelid = pg_class.oid - JOIN pg_namespace on pg_class.relnamespace = pg_namespace.oid + JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid LEFT JOIN pg_constraint ON pg_attribute.attrelid = pg_constraint.conrelid AND pg_attribute.attnum = ANY(pg_constraint.conkey) + AND pg_constraint.contype = 'p' WHERE pg_namespace.nspname = $1 AND relname = $2 AND pg_attribute.attnum > 0 AND NOT attisdropped - ORDER BY column_name`, req.SchemaName, req.TableName) + ORDER BY column_name;`, req.SchemaName, req.TableName) if err != nil { return nil, err } From fc59d94b51c4549495614238110bfceb91407355 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 4 Nov 2024 18:34:32 +0000 Subject: [PATCH 10/11] Sync batch pagination (#2207) Some customers have enough batches now that response size was exceeding grpc limit Changing this meant loading pages & aggregating graph data on server Removed searching specific batch id & changing sort column from ui --- flow/cmd/mirror_status.go | 124 ++++++++++++-- protos/route.proto | 33 +++- .../[mirrorId]/aggregatedCountsByInterval.ts | 4 +- ui/app/mirrors/[mirrorId]/cdc.tsx | 10 +- ui/app/mirrors/[mirrorId]/cdcDetails.tsx | 40 ++--- ui/app/mirrors/[mirrorId]/cdcGraph.tsx | 53 +++--- ui/app/mirrors/[mirrorId]/handlers.ts | 1 + ui/app/mirrors/[mirrorId]/page.tsx | 13 +- ui/app/mirrors/[mirrorId]/qrepGraph.tsx | 12 +- ui/app/mirrors/[mirrorId]/syncStatus.tsx | 9 +- ui/app/mirrors/[mirrorId]/syncStatusTable.tsx | 157 +++++++----------- ui/app/peers/[peerName]/lagGraph.tsx | 6 +- ui/app/settings/page.tsx | 3 - ui/components/LogsTable.tsx | 3 - 14 files changed, 264 insertions(+), 204 deletions(-) diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index da68b6457c..ffd6eba459 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -175,11 +175,19 @@ func (h *FlowRequestHandler) cdcFlowStatus( return nil, err } - cdcBatchesResponse, err := h.GetCDCBatches(ctx, &protos.GetCDCBatchesRequest{ - FlowJobName: req.FlowJobName, - Limit: 0, - }) - if err != nil { + var cdcBatches []*protos.CDCBatch + if !req.ExcludeBatches { + cdcBatchesResponse, err := h.GetCDCBatches(ctx, &protos.GetCDCBatchesRequest{FlowJobName: req.FlowJobName}) + if err != nil { + return nil, err + } + cdcBatches = cdcBatchesResponse.CdcBatches + } + + var rowsSynced int64 + if err := h.pool.QueryRow(ctx, + "select coalesce(sum(rows_in_batch), 0) from peerdb_stats.cdc_batches where flow_name=$1", req.FlowJobName, + ).Scan(&rowsSynced); err != nil { return nil, err } @@ -190,10 +198,43 @@ func (h *FlowRequestHandler) cdcFlowStatus( SnapshotStatus: &protos.SnapshotStatus{ Clones: initialLoadResponse.TableSummaries, }, - CdcBatches: cdcBatchesResponse.CdcBatches, + CdcBatches: cdcBatches, + RowsSynced: rowsSynced, }, nil } +func (h *FlowRequestHandler) CDCGraph(ctx context.Context, req *protos.GraphRequest) (*protos.GraphResponse, error) { + truncField := "minute" + switch req.AggregateType { + case "1hour": + truncField = "hour" + case "1day": + truncField = "day" + case "1month": + truncField = "month" + } + rows, err := h.pool.Query(ctx, `select tm, coalesce(sum(rows_in_batch), 0) + from generate_series(date_trunc($2, now() - $1::INTERVAL * 30), now(), $1::INTERVAL) tm + left join peerdb_stats.cdc_batches on start_time >= tm and start_time < tm + $1::INTERVAL + group by 1 order by 1`, req.AggregateType, truncField) + if err != nil { + return nil, err + } + data, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.GraphResponseItem, error) { + var t time.Time + var r int64 + if err := row.Scan(&t, &r); err != nil { + return nil, err + } + return &protos.GraphResponseItem{Time: float64(t.UnixMilli()), Rows: float64(r)}, nil + }) + if err != nil { + return nil, err + } + + return &protos.GraphResponse{Data: data}, nil +} + func (h *FlowRequestHandler) InitialLoadSummary( ctx context.Context, req *protos.InitialLoadSummaryRequest, @@ -455,18 +496,39 @@ func (h *FlowRequestHandler) getMirrorCreatedAt(ctx context.Context, flowJobName } func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) { - mirrorName := req.FlowJobName - limit := req.Limit + return h.CDCBatches(ctx, req) +} + +func (h *FlowRequestHandler) CDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) { limitClause := "" - if limit > 0 { - limitClause = fmt.Sprintf(" LIMIT %d", limit) + if req.Limit > 0 { + limitClause = fmt.Sprintf(" LIMIT %d", req.Limit) } - q := `SELECT DISTINCT ON(batch_id) batch_id,start_time,end_time,rows_in_batch,batch_start_lsn,batch_end_lsn FROM peerdb_stats.cdc_batches - WHERE flow_name=$1 AND start_time IS NOT NULL ORDER BY batch_id DESC, start_time DESC` + limitClause - rows, err := h.pool.Query(ctx, q, mirrorName) + + whereExpr := "" + queryArgs := append(make([]any, 0, 2), req.FlowJobName) + + sortOrderBy := "desc" + if req.BeforeId != 0 || req.AfterId != 0 { + if req.BeforeId != -1 { + queryArgs = append(queryArgs, req.BeforeId) + whereExpr = fmt.Sprintf(" AND batch_id < $%d", len(queryArgs)) + } else if req.AfterId != -1 { + queryArgs = append(queryArgs, req.AfterId) + whereExpr = fmt.Sprintf(" AND batch_id > $%d", len(queryArgs)) + sortOrderBy = "asc" + } + } + + q := fmt.Sprintf(`SELECT DISTINCT ON(batch_id) + batch_id,start_time,end_time,rows_in_batch,batch_start_lsn,batch_end_lsn + FROM peerdb_stats.cdc_batches + WHERE flow_name=$1 AND start_time IS NOT NULL%s + ORDER BY batch_id %s%s`, whereExpr, sortOrderBy, limitClause) + rows, err := h.pool.Query(ctx, q, queryArgs...) if err != nil { - slog.Error(fmt.Sprintf("unable to query cdc batches - %s: %s", mirrorName, err.Error())) - return nil, fmt.Errorf("unable to query cdc batches - %s: %w", mirrorName, err) + slog.Error(fmt.Sprintf("unable to query cdc batches - %s: %s", req.FlowJobName, err.Error())) + return nil, fmt.Errorf("unable to query cdc batches - %s: %w", req.FlowJobName, err) } batches, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.CDCBatch, error) { @@ -477,8 +539,8 @@ func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetC var startLSN pgtype.Numeric var endLSN pgtype.Numeric if err := rows.Scan(&batchID, &startTime, &endTime, &numRows, &startLSN, &endLSN); err != nil { - slog.Error(fmt.Sprintf("unable to scan cdc batches - %s: %s", mirrorName, err.Error())) - return nil, fmt.Errorf("unable to scan cdc batches - %s: %w", mirrorName, err) + slog.Error(fmt.Sprintf("unable to scan cdc batches - %s: %s", req.FlowJobName, err.Error())) + return nil, fmt.Errorf("unable to scan cdc batches - %s: %w", req.FlowJobName, err) } var batch protos.CDCBatch @@ -511,9 +573,35 @@ func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetC if batches == nil { batches = []*protos.CDCBatch{} } + if req.Ascending != (sortOrderBy == "asc") { + slices.Reverse(batches) + } + + var total int32 + var rowsBehind int32 + if len(batches) > 0 { + op := '>' + if req.Ascending { + op = '<' + } + firstId := batches[0].BatchId + if err := h.pool.QueryRow(ctx, fmt.Sprintf(`select count(distinct batch_id), count(distinct batch_id) filter (where batch_id%c$2) + from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null`, op), req.FlowJobName, firstId, + ).Scan(&total, &rowsBehind); err != nil { + return nil, err + } + } else if err := h.pool.QueryRow( + ctx, + "select count(distinct batch_id) from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null", + req.FlowJobName, + ).Scan(&total); err != nil { + return nil, err + } return &protos.GetCDCBatchesResponse{ CdcBatches: batches, + Total: total, + Page: rowsBehind/int32(req.Limit) + 1, }, nil } @@ -602,7 +690,7 @@ func (h *FlowRequestHandler) ListMirrorLogs( } sortOrderBy := "desc" - if req.BeforeId != 0 && req.AfterId != 0 { + if req.BeforeId != 0 || req.AfterId != 0 { if req.BeforeId != -1 { whereArgs = append(whereArgs, req.BeforeId) whereExprs = append(whereExprs, fmt.Sprintf("id < $%d", len(whereArgs))) diff --git a/protos/route.proto b/protos/route.proto index a729f88ac4..0265f221ee 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -145,6 +145,7 @@ message CreatePeerResponse { message MirrorStatusRequest { string flow_job_name = 1; bool include_flow_info = 2; + bool exclude_batches = 3; } message PartitionStatus { @@ -320,6 +321,7 @@ message CDCMirrorStatus { repeated CDCBatch cdc_batches = 3; peerdb_peers.DBType source_type = 4; peerdb_peers.DBType destination_type = 5; + int64 rows_synced = 6; } message MirrorStatusResponse { @@ -343,10 +345,29 @@ message InitialLoadSummaryResponse { message GetCDCBatchesRequest { string flow_job_name = 1; uint32 limit = 2; + bool ascending = 3; + int64 before_id = 4; + int64 after_id = 5; } message GetCDCBatchesResponse { repeated CDCBatch cdc_batches = 1; + int32 total = 2; + int32 page = 3; +} + +message GraphRequest { + string flow_job_name = 1; + string aggregate_type = 2; // TODO name? +} + +message GraphResponseItem { + double time = 1; + double rows = 2; +} + +message GraphResponse { + repeated GraphResponseItem data = 1; } message MirrorLog { @@ -545,11 +566,19 @@ service FlowService { } rpc GetCDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) { - option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}"}; + option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}" }; + } + + rpc CDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) { + option (google.api.http) = { post: "/v1/mirrors/cdc/batches", body: "*" }; + } + + rpc CDCGraph(GraphRequest) returns (GraphResponse) { + option (google.api.http) = { post: "/v1/mirrors/cdc/graph", body: "*" }; } rpc InitialLoadSummary(InitialLoadSummaryRequest) returns (InitialLoadSummaryResponse) { - option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}"}; + option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}" }; } rpc GetPeerInfo(PeerInfoRequest) returns (PeerInfoResponse) { diff --git a/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts b/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts index 42b74e6198..92b2f6cb98 100644 --- a/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts +++ b/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts @@ -7,7 +7,7 @@ type timestampType = { count: number; }; -function aggregateCountsByInterval( +export default function aggregateCountsByInterval( timestamps: timestampType[], interval: TimeAggregateTypes ): [string, number][] { @@ -83,5 +83,3 @@ function aggregateCountsByInterval( return resultArray; } - -export default aggregateCountsByInterval; diff --git a/ui/app/mirrors/[mirrorId]/cdc.tsx b/ui/app/mirrors/[mirrorId]/cdc.tsx index e404749b01..34556379b1 100644 --- a/ui/app/mirrors/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/[mirrorId]/cdc.tsx @@ -1,5 +1,5 @@ 'use client'; -import { CDCBatch, MirrorStatusResponse } from '@/grpc_generated/route'; +import { MirrorStatusResponse } from '@/grpc_generated/route'; import { Label } from '@/lib/Label'; import { ProgressCircle } from '@/lib/ProgressCircle'; import { Tab, TabGroup, TabList, TabPanel, TabPanels } from '@tremor/react'; @@ -10,14 +10,9 @@ import { SnapshotStatusTable } from './snapshot'; type CDCMirrorStatusProps = { status: MirrorStatusResponse; - rows: CDCBatch[]; syncStatusChild?: React.ReactNode; }; -export function CDCMirror({ - status, - rows, - syncStatusChild, -}: CDCMirrorStatusProps) { +export function CDCMirror({ status, syncStatusChild }: CDCMirrorStatusProps) { const LocalStorageTabKey = 'cdctab'; const [selectedTab, setSelectedTab] = useLocalStorage(LocalStorageTabKey, 0); const [mounted, setMounted] = useState(false); @@ -60,7 +55,6 @@ export function CDCMirror({ (); - let rowsSynced = syncs.reduce((acc, sync) => { - if (sync.endTime !== null) { - return acc + Number(sync.numRows); - } - return acc; - }, 0); +export default function CdcDetails({ + createdAt, + mirrorConfig, + mirrorStatus, +}: props) { + const [syncInterval, setSyncInterval] = useState(); const tablesSynced = mirrorConfig.config?.tableMappings; useEffect(() => { - getCurrentIdleTimeout(mirrorConfig.config?.flowJobName ?? '').then( - (res) => { - getSyncInterval(res); - } + getCurrentIdleTimeout(mirrorConfig.config?.flowJobName ?? '').then((res) => + setSyncInterval(res) ); }, [mirrorConfig.config?.flowJobName]); return ( @@ -82,8 +77,8 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
@@ -95,8 +90,8 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
@@ -129,7 +124,9 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
- +
@@ -151,8 +148,7 @@ const SyncIntervalLabel: React.FC<{ syncInterval?: number }> = ({ if (!syncInterval) { return ; - } - if (syncInterval >= 3600) { + } else if (syncInterval >= 3600) { const hours = Math.floor(syncInterval / 3600); formattedInterval = `${hours} hour${hours !== 1 ? 's' : ''}`; } else if (syncInterval >= 60) { @@ -164,5 +160,3 @@ const SyncIntervalLabel: React.FC<{ syncInterval?: number }> = ({ return ; }; - -export default CdcDetails; diff --git a/ui/app/mirrors/[mirrorId]/cdcGraph.tsx b/ui/app/mirrors/[mirrorId]/cdcGraph.tsx index 02e2c0d26a..e022101daa 100644 --- a/ui/app/mirrors/[mirrorId]/cdcGraph.tsx +++ b/ui/app/mirrors/[mirrorId]/cdcGraph.tsx @@ -5,35 +5,44 @@ import { TimeAggregateTypes, timeOptions, } from '@/app/utils/graph'; -import { CDCBatch } from '@/grpc_generated/route'; import { Label } from '@/lib/Label'; import { BarChart } from '@tremor/react'; -import { useMemo, useState } from 'react'; +import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; -import aggregateCountsByInterval from './aggregatedCountsByInterval'; -type CdcGraphProps = { - syncs: CDCBatch[]; -}; +type CdcGraphProps = { mirrorName: string }; -function CdcGraph({ syncs }: CdcGraphProps) { - let [aggregateType, setAggregateType] = useState( +export default function CdcGraph({ mirrorName }: CdcGraphProps) { + const [aggregateType, setAggregateType] = useState( TimeAggregateTypes.HOUR ); + const [graphValues, setGraphValues] = useState< + { name: string; 'Rows synced at a point in time': number }[] + >([]); - const graphValues = useMemo(() => { - const rows = syncs.map((sync) => ({ - timestamp: sync.endTime, - count: sync.numRows, - })); - let timedRowCounts = aggregateCountsByInterval(rows, aggregateType); - timedRowCounts = timedRowCounts.slice(0, 29); - timedRowCounts = timedRowCounts.reverse(); - return timedRowCounts.map((count) => ({ - name: formatGraphLabel(new Date(count[0]), aggregateType), - 'Rows synced at a point in time': Number(count[1]), - })); - }, [syncs, aggregateType]); + useEffect(() => { + const fetchData = async () => { + const req: any = { + flowJobName: mirrorName, + aggregateType, + }; + + const res = await fetch('/api/v1/mirrors/cdc/graph', { + method: 'POST', + cache: 'no-store', + body: JSON.stringify(req), + }); + const data: { data: { time: number; rows: number }[] } = await res.json(); + setGraphValues( + data.data.map(({ time, rows }) => ({ + name: formatGraphLabel(new Date(time), aggregateType), + 'Rows synced at a point in time': Number(rows), + })) + ); + }; + + fetchData(); + }, [mirrorName, aggregateType]); return (
@@ -59,5 +68,3 @@ function CdcGraph({ syncs }: CdcGraphProps) {
); } - -export default CdcGraph; diff --git a/ui/app/mirrors/[mirrorId]/handlers.ts b/ui/app/mirrors/[mirrorId]/handlers.ts index 7e68f26a53..0a3f46a4e2 100644 --- a/ui/app/mirrors/[mirrorId]/handlers.ts +++ b/ui/app/mirrors/[mirrorId]/handlers.ts @@ -12,6 +12,7 @@ export const getMirrorState = async ( body: JSON.stringify({ flow_job_name, include_flow_info: true, + exclude_batches: true, }), }); if (!res.ok) throw res.json(); diff --git a/ui/app/mirrors/[mirrorId]/page.tsx b/ui/app/mirrors/[mirrorId]/page.tsx index 5516e0986a..33832a66cd 100644 --- a/ui/app/mirrors/[mirrorId]/page.tsx +++ b/ui/app/mirrors/[mirrorId]/page.tsx @@ -51,12 +51,7 @@ export default function ViewMirror({ params: { mirrorId } }: EditMirrorProps) { let actionsDropdown = null; if (mirrorState?.cdcStatus) { - syncStatusChild = ( - - ); + syncStatusChild = ; const dbType = dBTypeFromJSON(mirrorState.cdcStatus.destinationType); @@ -93,11 +88,7 @@ export default function ViewMirror({ params: { mirrorId } }: EditMirrorProps) {
{mirrorId}
{actionsDropdown} - + ); } else if (mirrorState?.qrepStatus) { diff --git a/ui/app/mirrors/[mirrorId]/qrepGraph.tsx b/ui/app/mirrors/[mirrorId]/qrepGraph.tsx index 84eb958935..7bc1dbff5d 100644 --- a/ui/app/mirrors/[mirrorId]/qrepGraph.tsx +++ b/ui/app/mirrors/[mirrorId]/qrepGraph.tsx @@ -17,22 +17,20 @@ type QRepGraphProps = { }; function QrepGraph({ syncs }: QRepGraphProps) { - let [aggregateType, setAggregateType] = useState( + const [aggregateType, setAggregateType] = useState( TimeAggregateTypes.HOUR ); const initialCount: [string, number][] = []; - let [counts, setCounts] = useState(initialCount); + const [counts, setCounts] = useState(initialCount); useEffect(() => { - let rows = syncs.map((sync) => ({ + const rows = syncs.map((sync) => ({ timestamp: sync.startTime!, count: Number(sync.rowsInPartition) ?? 0, })); - let counts = aggregateCountsByInterval(rows, aggregateType); - counts = counts.slice(0, 29); - counts = counts.reverse(); - setCounts(counts); + const counts = aggregateCountsByInterval(rows, aggregateType); + setCounts(counts.slice(0, 29).reverse()); }, [aggregateType, syncs]); return ( diff --git a/ui/app/mirrors/[mirrorId]/syncStatus.tsx b/ui/app/mirrors/[mirrorId]/syncStatus.tsx index 0c2d2ba49d..fb45bfc6e7 100644 --- a/ui/app/mirrors/[mirrorId]/syncStatus.tsx +++ b/ui/app/mirrors/[mirrorId]/syncStatus.tsx @@ -1,6 +1,6 @@ 'use client'; import { fetcher } from '@/app/utils/swr'; -import { CDCBatch, CDCTableTotalCountsResponse } from '@/grpc_generated/route'; +import { CDCTableTotalCountsResponse } from '@/grpc_generated/route'; import useSWR from 'swr'; import CdcGraph from './cdcGraph'; import RowsDisplay from './rowsDisplay'; @@ -9,10 +9,9 @@ import TableStats from './tableStats'; type SyncStatusProps = { flowJobName: string; - rows: CDCBatch[]; }; -export default function SyncStatus({ flowJobName, rows }: SyncStatusProps) { +export default function SyncStatus({ flowJobName }: SyncStatusProps) { const { data: tableStats, error, @@ -31,9 +30,9 @@ export default function SyncStatus({ flowJobName, rows }: SyncStatusProps) {
- +
- +
) diff --git a/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx b/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx index 6453408948..493822fcf3 100644 --- a/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx +++ b/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx @@ -1,22 +1,21 @@ 'use client'; -import SelectTheme from '@/app/styles/select'; import TimeLabel from '@/components/TimeComponent'; -import { CDCBatch } from '@/grpc_generated/route'; +import { + CDCBatch, + GetCDCBatchesRequest, + GetCDCBatchesResponse, +} from '@/grpc_generated/route'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { ProgressCircle } from '@/lib/ProgressCircle'; -import { SearchField } from '@/lib/SearchField'; import { Table, TableCell, TableRow } from '@/lib/Table'; import moment from 'moment'; -import { useMemo, useState } from 'react'; -import ReactSelect from 'react-select'; +import { useCallback, useEffect, useState } from 'react'; import { RowDataFormatter } from './rowsDisplay'; -type SyncStatusTableProps = { - rows: CDCBatch[]; -}; +type SyncStatusTableProps = { mirrorName: string }; function TimeWithDurationOrRunning({ startTime, @@ -46,63 +45,54 @@ function TimeWithDurationOrRunning({ } const ROWS_PER_PAGE = 5; -const sortOptions = [ - { value: 'batchId', label: 'Batch ID' }, - { value: 'startTime', label: 'Start Time' }, - { value: 'endTime', label: 'End Time' }, - { value: 'numRows', label: 'Rows Synced' }, -]; - -export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { +export const SyncStatusTable = ({ mirrorName }: SyncStatusTableProps) => { + const [totalPages, setTotalPages] = useState(1); const [currentPage, setCurrentPage] = useState(1); - const [sortField, setSortField] = useState< - 'startTime' | 'endTime' | 'numRows' | 'batchId' - >('batchId'); - - const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('dsc'); - const totalPages = Math.ceil(rows.length / ROWS_PER_PAGE); - const [searchQuery, setSearchQuery] = useState(NaN); - const displayedRows = useMemo(() => { - const searchRows = rows.filter((row) => row.batchId == searchQuery); - const shownRows = searchRows.length > 0 ? searchRows : rows; - shownRows.sort((a, b) => { - let aValue: any = a[sortField]; - let bValue: any = b[sortField]; - if (aValue === undefined || bValue === undefined) { - return 0; - } - if (sortField === 'batchId') { - aValue = BigInt(aValue); - bValue = BigInt(bValue); - } + const [ascending, setAscending] = useState(false); + const [[beforeId, afterId], setBeforeAfterId] = useState([-1, -1]); + const [batches, setBatches] = useState([]); - if (aValue < bValue) { - return sortDir === 'dsc' ? 1 : -1; - } else if (aValue > bValue) { - return sortDir === 'dsc' ? -1 : 1; - } else { - return 0; - } - }); + useEffect(() => { + const fetchData = async () => { + const req: GetCDCBatchesRequest = { + flowJobName: mirrorName, + limit: ROWS_PER_PAGE, + beforeId: beforeId, + afterId: afterId, + ascending, + }; + const res = await fetch('/api/v1/mirrors/cdc/batches', { + method: 'POST', + cache: 'no-store', + body: JSON.stringify(req), + }); + const data: GetCDCBatchesResponse = await res.json(); + setBatches(data.cdcBatches ?? []); + setCurrentPage(data.page); + setTotalPages(Math.ceil(data.total / req.limit)); + }; - const startRow = (currentPage - 1) * ROWS_PER_PAGE; - const endRow = startRow + ROWS_PER_PAGE; - return shownRows.length > ROWS_PER_PAGE - ? shownRows.slice(startRow, endRow) - : shownRows; - }, [searchQuery, currentPage, rows, sortField, sortDir]); + fetchData(); + }, [mirrorName, beforeId, afterId, ascending]); - const handlePrevPage = () => { - if (currentPage > 1) { - setCurrentPage(currentPage - 1); + const nextPage = useCallback(() => { + if (batches.length === 0) { + setBeforeAfterId([-1, ascending ? 0 : -1]); + } else if (ascending) { + setBeforeAfterId([-1, batches[batches.length - 1].batchId]); + } else { + setBeforeAfterId([batches[batches.length - 1].batchId, -1]); } - }; - - const handleNextPage = () => { - if (currentPage < totalPages) { - setCurrentPage(currentPage + 1); + }, [batches, ascending]); + const prevPage = useCallback(() => { + if (batches.length === 0 || currentPage < 3) { + setBeforeAfterId([-1, ascending ? 0 : -1]); + } else if (ascending) { + setBeforeAfterId([batches[0].batchId, -1]); + } else { + setBeforeAfterId([-1, batches[0].batchId]); } - }; + }, [batches, ascending, currentPage]); return (
{ toolbar={{ left: (
- - @@ -123,53 +113,30 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { > -
- opt.value === sortField) - ?.label, - }} - onChange={(val, _) => { - const sortVal = - (val?.value as - | 'startTime' - | 'endTime' - | 'numRows' - | 'batchId') ?? 'batchId'; - setSortField(sortVal); - }} - defaultValue={{ value: 'batchId', label: 'Batch ID' }} - theme={SelectTheme} - /> -
), - right: ( - ) => - setSearchQuery(+e.target.value) - } - /> - ), }} header={ @@ -185,7 +152,7 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { } > - {displayedRows.map((row) => ( + {batches.map((row) => ( diff --git a/ui/app/peers/[peerName]/lagGraph.tsx b/ui/app/peers/[peerName]/lagGraph.tsx index 7107b7d809..87b90fa8c8 100644 --- a/ui/app/peers/[peerName]/lagGraph.tsx +++ b/ui/app/peers/[peerName]/lagGraph.tsx @@ -22,7 +22,7 @@ function parseLSN(lsn: string): number { if (!lsn) return 0; const [lsn1, lsn2] = lsn.split('/'); return Number( - (BigInt(parseInt(lsn1)) << BigInt(32)) | BigInt(parseInt(lsn2)) + (BigInt(parseInt(lsn1, 16)) << BigInt(32)) | BigInt(parseInt(lsn2, 16)) ); } @@ -135,7 +135,7 @@ export default function LagGraph({ peerName }: LagGraphProps) { /> setShowLsn((val) => !val)} /> )} diff --git a/ui/app/settings/page.tsx b/ui/app/settings/page.tsx index e5bb5ad15b..7ebb1b4cd0 100644 --- a/ui/app/settings/page.tsx +++ b/ui/app/settings/page.tsx @@ -123,9 +123,6 @@ const DynamicSettingItem = ({ const updatedSetting = { ...setting, value: newValue }; await fetch('/api/v1/dynamic_settings', { method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, body: JSON.stringify(updatedSetting), }); setEditMode(false); diff --git a/ui/components/LogsTable.tsx b/ui/components/LogsTable.tsx index c340044b44..cc09deeb7e 100644 --- a/ui/components/LogsTable.tsx +++ b/ui/components/LogsTable.tsx @@ -69,9 +69,6 @@ export default function LogsTable({ try { const response = await fetch('/api/v1/mirrors/logs', { method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, cache: 'no-store', body: JSON.stringify(req), }); From 6e7a219d5991785771598caf5bdbd66f95467fd6 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 6 Nov 2024 00:48:54 +0530 Subject: [PATCH 11/11] [clickhouse] actually call FinishQRepPartition to store partition state (#2218) otherwise partitions will repeat themselves because we think it hasn't synced --- flow/connectors/clickhouse/qrep_avro_sync.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index fa7f03cf88..edbd0392c9 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "strings" + "time" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -103,6 +104,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( ) (int, error) { dstTableName := config.DestinationTableIdentifier stagingPath := s.connector.credsProvider.BucketPath + startTime := time.Now() avroSchema, err := s.getAvroSchema(dstTableName, stream.Schema()) if err != nil { @@ -154,6 +156,11 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( return 0, err } + if err := s.connector.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime); err != nil { + s.connector.logger.Error("Failed to finish QRep partition", slog.Any("error", err)) + return 0, err + } + return avroFile.NumRecords, nil }