diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index cacc0ba881..cc09bae0d7 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -1065,3 +1065,34 @@ func (a *FlowableActivity) RemoveTablesFromCatalog( return err } + +func (a *FlowableActivity) RemoveFlowEntryFromCatalog(ctx context.Context, flowName string) error { + logger := log.With(activity.GetLogger(ctx), + slog.String(string(shared.FlowNameKey), flowName)) + tx, err := a.CatalogPool.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction to remove flow entries from catalog: %w", err) + } + defer shared.RollbackTx(tx, slog.Default()) + + if _, err := tx.Exec(ctx, "DELETE FROM table_schema_mapping WHERE flow_name=$1", flowName); err != nil { + return fmt.Errorf("unable to clear table_schema_mapping in catalog: %w", err) + } + + ct, err := tx.Exec(ctx, "DELETE FROM flows WHERE name=$1", flowName) + if err != nil { + return fmt.Errorf("unable to remove flow entry in catalog: %w", err) + } + if ct.RowsAffected() == 0 { + logger.Warn("flow entry not found in catalog, 0 records deleted") + } else { + logger.Info("flow entries removed from catalog", + slog.Int("rowsAffected", int(ct.RowsAffected()))) + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit transaction to remove flow entries from catalog: %w", err) + } + + return nil +} diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 66040396ce..db04efea30 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -53,6 +53,9 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, var none TPull logger := activity.GetLogger(ctx) attempt := 0 + waitInterval := time.Second + // try for 5 minutes, once per second + // after that, try indefinitely every minute for { a.CdcCacheRw.RLock() entry, ok := a.CdcCache[sessionID] @@ -63,7 +66,7 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, } return none, fmt.Errorf("expected %s, cache held %T", reflect.TypeFor[TPull]().Name(), entry.connector) } - activity.RecordHeartbeat(ctx, "wait another second for source connector") + activity.RecordHeartbeat(ctx, fmt.Sprintf("wait %s for source connector", waitInterval)) attempt += 1 if attempt > 2 { logger.Info("waiting on source connector setup", @@ -72,7 +75,12 @@ func waitForCdcCache[TPull connectors.CDCPullConnectorCore](ctx context.Context, if err := ctx.Err(); err != nil { return none, err } - time.Sleep(time.Second) + time.Sleep(waitInterval) + if attempt == 300 { + logger.Info("source connector not setup in time, transition to slow wait", + slog.String("sessionID", sessionID)) + waitInterval = time.Minute + } } } diff --git a/flow/cmd/api.go b/flow/cmd/api.go index 58e6beac0f..ca225e4292 100644 --- a/flow/cmd/api.go +++ b/flow/cmd/api.go @@ -23,8 +23,8 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" - "github.com/PeerDB-io/peer-flow/auth" "github.com/PeerDB-io/peer-flow/generated/protos" + "github.com/PeerDB-io/peer-flow/middleware" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" @@ -213,14 +213,23 @@ func APIMain(ctx context.Context, args *APIServerParams) error { return fmt.Errorf("unable to create Temporal client: %w", err) } - options, err := auth.AuthGrpcMiddleware([]string{ + authGrpcMiddleware, err := middleware.AuthGrpcMiddleware([]string{ grpc_health_v1.Health_Check_FullMethodName, grpc_health_v1.Health_Watch_FullMethodName, }) if err != nil { return err } - grpcServer := grpc.NewServer(options...) + + requestLoggingMiddleware := middleware.RequestLoggingMiddleWare() + + // Interceptors are executed in the order they are passed to, so unauthorized requests are not logged + interceptors := grpc.ChainUnaryInterceptor( + authGrpcMiddleware, + requestLoggingMiddleware, + ) + + grpcServer := grpc.NewServer(interceptors) catalogPool, err := peerdbenv.GetCatalogConnectionPoolFromEnv(ctx) if err != nil { diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index d9f5c27d9c..8b30331ae8 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -175,31 +175,6 @@ func (h *FlowRequestHandler) updateFlowConfigInCatalog( return shared.UpdateCDCConfigInCatalog(ctx, h.pool, slog.Default(), cfg) } -func (h *FlowRequestHandler) removeFlowEntryInCatalog( - ctx context.Context, - flowName string, -) error { - tx, err := h.pool.Begin(ctx) - if err != nil { - return fmt.Errorf("unable to begin tx to remove flow entry in catalog: %w", err) - } - defer shared.RollbackTx(tx, slog.Default()) - - if _, err := tx.Exec(ctx, "DELETE FROM table_schema_mapping WHERE flow_name=$1", flowName); err != nil { - return fmt.Errorf("unable to clear table_schema_mapping to remove flow entry in catalog: %w", err) - } - - if _, err := tx.Exec(ctx, "DELETE FROM flows WHERE name=$1", flowName); err != nil { - return fmt.Errorf("unable to remove flow entry in catalog: %w", err) - } - - if err := tx.Commit(ctx); err != nil { - return fmt.Errorf("unable to commit remove flow entry in catalog: %w", err) - } - - return nil -} - func (h *FlowRequestHandler) CreateQRepFlow( ctx context.Context, req *protos.CreateQRepFlowRequest, ) (*protos.CreateQRepFlowResponse, error) { @@ -295,56 +270,52 @@ func (h *FlowRequestHandler) shutdownFlow( if err != nil { slog.Error("unable to check if workflow is cdc", logs, slog.Any("error", err)) return fmt.Errorf("unable to determine if workflow is cdc: %w", err) - } else if isCdc { - cdcConfig, err := h.getFlowConfigFromCatalog(ctx, flowJobName) + } + var cdcConfig *protos.FlowConnectionConfigs + if isCdc { + cdcConfig, err = h.getFlowConfigFromCatalog(ctx, flowJobName) if err != nil { slog.Error("unable to get cdc config from catalog", logs, slog.Any("error", err)) return fmt.Errorf("unable to get cdc config from catalog: %w", err) } - workflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New()) - workflowOptions := client.StartWorkflowOptions{ - ID: workflowID, - TaskQueue: h.peerflowTaskQueueID, - TypedSearchAttributes: shared.NewSearchAttributes(flowJobName), - } + } + dropFlowWorkflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New()) + workflowOptions := client.StartWorkflowOptions{ + ID: dropFlowWorkflowID, + TaskQueue: h.peerflowTaskQueueID, + TypedSearchAttributes: shared.NewSearchAttributes(flowJobName), + } - dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, - peerflow.DropFlowWorkflow, &protos.DropFlowInput{ - FlowJobName: flowJobName, - SourcePeerName: cdcConfig.SourceName, - DestinationPeerName: cdcConfig.DestinationName, - DropFlowStats: deleteStats, - }) - if err != nil { - slog.Error("unable to start DropFlow workflow", logs, slog.Any("error", err)) - return fmt.Errorf("unable to start DropFlow workflow: %w", err) - } + dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, + peerflow.DropFlowWorkflow, &protos.DropFlowInput{ + FlowJobName: flowJobName, + DropFlowStats: deleteStats, + FlowConnectionConfigs: cdcConfig, + }) + if err != nil { + slog.Error("unable to start DropFlow workflow", logs, slog.Any("error", err)) + return fmt.Errorf("unable to start DropFlow workflow: %w", err) + } - cancelCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) - defer cancel() + cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() - errChan := make(chan error, 1) - go func() { - errChan <- dropFlowHandle.Get(cancelCtx, nil) - }() + errChan := make(chan error, 1) + go func() { + errChan <- dropFlowHandle.Get(cancelCtx, nil) + }() - select { - case err := <-errChan: - if err != nil { - slog.Error("DropFlow workflow did not execute successfully", logs, slog.Any("error", err)) - return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) - } - case <-time.After(5 * time.Minute): - if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil { - slog.Error("unable to wait for DropFlow workflow to close", logs, slog.Any("error", err)) - return fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err) - } + select { + case err := <-errChan: + if err != nil { + slog.Error("DropFlow workflow did not execute successfully", logs, slog.Any("error", err)) + return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) + } + case <-time.After(5 * time.Minute): + if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil { + slog.Error("unable to wait for DropFlow workflow to close", logs, slog.Any("error", err)) + return fmt.Errorf("unable to wait for DropFlow workflow to close: %w", err) } - } - - if err := h.removeFlowEntryInCatalog(ctx, flowJobName); err != nil { - slog.Error("unable to remove flow job entry", logs, slog.Any("error", err)) - return err } return nil diff --git a/flow/cmd/logged_interceptor.go b/flow/cmd/logged_interceptor.go new file mode 100644 index 0000000000..d5ee4fc972 --- /dev/null +++ b/flow/cmd/logged_interceptor.go @@ -0,0 +1,79 @@ +package cmd + +import ( + "context" + + "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/workflow" +) + +type LoggedWorkflowInboundInterceptor struct { + interceptor.WorkflowInboundInterceptorBase + Next interceptor.WorkflowInboundInterceptor +} + +func NewLoggedWorkflowInboundInterceptor(next interceptor.WorkflowInboundInterceptor) *LoggedWorkflowInboundInterceptor { + return &LoggedWorkflowInboundInterceptor{ + WorkflowInboundInterceptorBase: interceptor.WorkflowInboundInterceptorBase{Next: next}, + Next: next, + } +} + +func (w *LoggedWorkflowInboundInterceptor) ExecuteWorkflow( + ctx workflow.Context, + in *interceptor.ExecuteWorkflowInput, +) (interface{}, error) { + // Workflow starts here + result, err := w.Next.ExecuteWorkflow(ctx, in) + // Workflow ends here + return result, err +} + +type LoggedActivityInboundInterceptor struct { + interceptor.ActivityInboundInterceptorBase + Next interceptor.ActivityInboundInterceptor +} + +func NewLoggedActivityInboundInterceptor(next interceptor.ActivityInboundInterceptor) *LoggedActivityInboundInterceptor { + return &LoggedActivityInboundInterceptor{ + ActivityInboundInterceptorBase: interceptor.ActivityInboundInterceptorBase{Next: next}, + Next: next, + } +} + +func (c *LoggedActivityInboundInterceptor) ExecuteActivity( + ctx context.Context, + in *interceptor.ExecuteActivityInput, +) (interface{}, error) { + // Activity starts here + out, err := c.Next.ExecuteActivity(ctx, in) + // Activity ends here + return out, err +} + +type LoggedWorkerInterceptor struct { + interceptor.WorkerInterceptorBase +} + +func (c LoggedWorkerInterceptor) InterceptActivity( + ctx context.Context, + next interceptor.ActivityInboundInterceptor, +) interceptor.ActivityInboundInterceptor { + return NewLoggedActivityInboundInterceptor(next) +} + +func (c LoggedWorkerInterceptor) InterceptWorkflow( + ctx workflow.Context, + next interceptor.WorkflowInboundInterceptor, +) interceptor.WorkflowInboundInterceptor { + // Workflow intercepted here + intercepted := NewLoggedWorkflowInboundInterceptor(next) + // Workflow intercepting ends here + return intercepted +} + +func NewLoggedWorkerInterceptor() *LoggedWorkerInterceptor { + return &LoggedWorkerInterceptor{ + WorkerInterceptorBase: interceptor.WorkerInterceptorBase{}, + } +} diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index 70efa7597b..ffd6eba459 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "log/slog" + "slices" "strings" "time" @@ -174,11 +175,19 @@ func (h *FlowRequestHandler) cdcFlowStatus( return nil, err } - cdcBatchesResponse, err := h.GetCDCBatches(ctx, &protos.GetCDCBatchesRequest{ - FlowJobName: req.FlowJobName, - Limit: 0, - }) - if err != nil { + var cdcBatches []*protos.CDCBatch + if !req.ExcludeBatches { + cdcBatchesResponse, err := h.GetCDCBatches(ctx, &protos.GetCDCBatchesRequest{FlowJobName: req.FlowJobName}) + if err != nil { + return nil, err + } + cdcBatches = cdcBatchesResponse.CdcBatches + } + + var rowsSynced int64 + if err := h.pool.QueryRow(ctx, + "select coalesce(sum(rows_in_batch), 0) from peerdb_stats.cdc_batches where flow_name=$1", req.FlowJobName, + ).Scan(&rowsSynced); err != nil { return nil, err } @@ -189,10 +198,43 @@ func (h *FlowRequestHandler) cdcFlowStatus( SnapshotStatus: &protos.SnapshotStatus{ Clones: initialLoadResponse.TableSummaries, }, - CdcBatches: cdcBatchesResponse.CdcBatches, + CdcBatches: cdcBatches, + RowsSynced: rowsSynced, }, nil } +func (h *FlowRequestHandler) CDCGraph(ctx context.Context, req *protos.GraphRequest) (*protos.GraphResponse, error) { + truncField := "minute" + switch req.AggregateType { + case "1hour": + truncField = "hour" + case "1day": + truncField = "day" + case "1month": + truncField = "month" + } + rows, err := h.pool.Query(ctx, `select tm, coalesce(sum(rows_in_batch), 0) + from generate_series(date_trunc($2, now() - $1::INTERVAL * 30), now(), $1::INTERVAL) tm + left join peerdb_stats.cdc_batches on start_time >= tm and start_time < tm + $1::INTERVAL + group by 1 order by 1`, req.AggregateType, truncField) + if err != nil { + return nil, err + } + data, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.GraphResponseItem, error) { + var t time.Time + var r int64 + if err := row.Scan(&t, &r); err != nil { + return nil, err + } + return &protos.GraphResponseItem{Time: float64(t.UnixMilli()), Rows: float64(r)}, nil + }) + if err != nil { + return nil, err + } + + return &protos.GraphResponse{Data: data}, nil +} + func (h *FlowRequestHandler) InitialLoadSummary( ctx context.Context, req *protos.InitialLoadSummaryRequest, @@ -454,18 +496,39 @@ func (h *FlowRequestHandler) getMirrorCreatedAt(ctx context.Context, flowJobName } func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) { - mirrorName := req.FlowJobName - limit := req.Limit + return h.CDCBatches(ctx, req) +} + +func (h *FlowRequestHandler) CDCBatches(ctx context.Context, req *protos.GetCDCBatchesRequest) (*protos.GetCDCBatchesResponse, error) { limitClause := "" - if limit > 0 { - limitClause = fmt.Sprintf(" LIMIT %d", limit) + if req.Limit > 0 { + limitClause = fmt.Sprintf(" LIMIT %d", req.Limit) + } + + whereExpr := "" + queryArgs := append(make([]any, 0, 2), req.FlowJobName) + + sortOrderBy := "desc" + if req.BeforeId != 0 || req.AfterId != 0 { + if req.BeforeId != -1 { + queryArgs = append(queryArgs, req.BeforeId) + whereExpr = fmt.Sprintf(" AND batch_id < $%d", len(queryArgs)) + } else if req.AfterId != -1 { + queryArgs = append(queryArgs, req.AfterId) + whereExpr = fmt.Sprintf(" AND batch_id > $%d", len(queryArgs)) + sortOrderBy = "asc" + } } - q := `SELECT DISTINCT ON(batch_id) batch_id,start_time,end_time,rows_in_batch,batch_start_lsn,batch_end_lsn FROM peerdb_stats.cdc_batches - WHERE flow_name=$1 AND start_time IS NOT NULL ORDER BY batch_id DESC, start_time DESC` + limitClause - rows, err := h.pool.Query(ctx, q, mirrorName) + + q := fmt.Sprintf(`SELECT DISTINCT ON(batch_id) + batch_id,start_time,end_time,rows_in_batch,batch_start_lsn,batch_end_lsn + FROM peerdb_stats.cdc_batches + WHERE flow_name=$1 AND start_time IS NOT NULL%s + ORDER BY batch_id %s%s`, whereExpr, sortOrderBy, limitClause) + rows, err := h.pool.Query(ctx, q, queryArgs...) if err != nil { - slog.Error(fmt.Sprintf("unable to query cdc batches - %s: %s", mirrorName, err.Error())) - return nil, fmt.Errorf("unable to query cdc batches - %s: %w", mirrorName, err) + slog.Error(fmt.Sprintf("unable to query cdc batches - %s: %s", req.FlowJobName, err.Error())) + return nil, fmt.Errorf("unable to query cdc batches - %s: %w", req.FlowJobName, err) } batches, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.CDCBatch, error) { @@ -476,8 +539,8 @@ func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetC var startLSN pgtype.Numeric var endLSN pgtype.Numeric if err := rows.Scan(&batchID, &startTime, &endTime, &numRows, &startLSN, &endLSN); err != nil { - slog.Error(fmt.Sprintf("unable to scan cdc batches - %s: %s", mirrorName, err.Error())) - return nil, fmt.Errorf("unable to scan cdc batches - %s: %w", mirrorName, err) + slog.Error(fmt.Sprintf("unable to scan cdc batches - %s: %s", req.FlowJobName, err.Error())) + return nil, fmt.Errorf("unable to scan cdc batches - %s: %w", req.FlowJobName, err) } var batch protos.CDCBatch @@ -510,9 +573,35 @@ func (h *FlowRequestHandler) GetCDCBatches(ctx context.Context, req *protos.GetC if batches == nil { batches = []*protos.CDCBatch{} } + if req.Ascending != (sortOrderBy == "asc") { + slices.Reverse(batches) + } + + var total int32 + var rowsBehind int32 + if len(batches) > 0 { + op := '>' + if req.Ascending { + op = '<' + } + firstId := batches[0].BatchId + if err := h.pool.QueryRow(ctx, fmt.Sprintf(`select count(distinct batch_id), count(distinct batch_id) filter (where batch_id%c$2) + from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null`, op), req.FlowJobName, firstId, + ).Scan(&total, &rowsBehind); err != nil { + return nil, err + } + } else if err := h.pool.QueryRow( + ctx, + "select count(distinct batch_id) from peerdb_stats.cdc_batches where flow_name=$1 and start_time is not null", + req.FlowJobName, + ).Scan(&total); err != nil { + return nil, err + } return &protos.GetCDCBatchesResponse{ CdcBatches: batches, + Total: total, + Page: rowsBehind/int32(req.Limit) + 1, }, nil } @@ -581,8 +670,8 @@ func (h *FlowRequestHandler) ListMirrorLogs( ctx context.Context, req *protos.ListMirrorLogsRequest, ) (*protos.ListMirrorLogsResponse, error) { - whereExprs := make([]string, 0, 2) - whereArgs := make([]interface{}, 0, 2) + whereExprs := make([]string, 0, 3) + whereArgs := make([]any, 0, 4) if req.FlowJobName != "" { whereArgs = append(whereArgs, req.FlowJobName) whereExprs = append(whereExprs, "position($1 in flow_name) > 0") @@ -593,23 +682,47 @@ func (h *FlowRequestHandler) ListMirrorLogs( whereExprs = append(whereExprs, fmt.Sprintf("error_type = $%d", len(whereArgs))) } + // count query doesn't want paging + countWhereArgs := slices.Clone(whereArgs) + var countWhereClause string + if len(whereExprs) != 0 { + countWhereClause = " WHERE " + strings.Join(whereExprs, " AND ") + } + + sortOrderBy := "desc" + if req.BeforeId != 0 || req.AfterId != 0 { + if req.BeforeId != -1 { + whereArgs = append(whereArgs, req.BeforeId) + whereExprs = append(whereExprs, fmt.Sprintf("id < $%d", len(whereArgs))) + } else if req.AfterId != -1 { + whereArgs = append(whereArgs, req.AfterId) + whereExprs = append(whereExprs, fmt.Sprintf("id > $%d", len(whereArgs))) + sortOrderBy = "" + } + } + var whereClause string if len(whereExprs) != 0 { whereClause = " WHERE " + strings.Join(whereExprs, " AND ") } - skip := (req.Page - 1) * req.NumPerPage - rows, err := h.pool.Query(ctx, fmt.Sprintf(`select flow_name, error_message, error_type, error_timestamp - from peerdb_stats.flow_errors %s - order by error_timestamp desc - limit %d offset %d`, whereClause, req.NumPerPage, skip), whereArgs...) + // page is deprecated + var offsetClause string + if req.Page != 0 { + offsetClause = fmt.Sprintf(" offset %d", (req.Page-1)*req.NumPerPage) + } + + rows, err := h.pool.Query(ctx, fmt.Sprintf(`select id, flow_name, error_message, error_type, error_timestamp + from peerdb_stats.flow_errors%s + order by id %s + limit %d%s`, whereClause, sortOrderBy, req.NumPerPage, offsetClause), whereArgs...) if err != nil { return nil, err } mirrorErrors, err := pgx.CollectRows(rows, func(row pgx.CollectableRow) (*protos.MirrorLog, error) { var log protos.MirrorLog var errorTimestamp time.Time - if err := rows.Scan(&log.FlowName, &log.ErrorMessage, &log.ErrorType, &errorTimestamp); err != nil { + if err := rows.Scan(&log.Id, &log.FlowName, &log.ErrorMessage, &log.ErrorType, &errorTimestamp); err != nil { return nil, err } log.ErrorTimestamp = float64(errorTimestamp.UnixMilli()) @@ -618,14 +731,37 @@ func (h *FlowRequestHandler) ListMirrorLogs( if err != nil { return nil, err } + if sortOrderBy == "" { + slices.Reverse(mirrorErrors) + } var total int32 - if err := h.pool.QueryRow(ctx, "select count(*) from peerdb_stats.flow_errors"+whereClause, whereArgs...).Scan(&total); err != nil { + var rowsBehind int32 + if len(mirrorErrors) > 0 { + firstId := mirrorErrors[0].Id + countWhereArgs = append(countWhereArgs, firstId) + if err := h.pool.QueryRow( + ctx, + fmt.Sprintf("select count(*), count(*) filter (where id > $%d) from peerdb_stats.flow_errors%s", + len(countWhereArgs), countWhereClause), + countWhereArgs..., + ).Scan(&total, &rowsBehind); err != nil { + return nil, err + } + } else if err := h.pool.QueryRow( + ctx, "select count(*) from peerdb_stats.flow_errors"+countWhereClause, countWhereArgs..., + ).Scan(&total); err != nil { return nil, err } + page := req.Page + if page == 0 { + page = rowsBehind/req.NumPerPage + 1 + } + return &protos.ListMirrorLogsResponse{ Errors: mirrorErrors, Total: total, + Page: page, }, nil } diff --git a/flow/cmd/peer_data.go b/flow/cmd/peer_data.go index 9faf61c394..cb625978fe 100644 --- a/flow/cmd/peer_data.go +++ b/flow/cmd/peer_data.go @@ -297,19 +297,20 @@ func (h *FlowRequestHandler) GetColumns( defer peerConn.Close(ctx) rows, err := peerConn.Query(ctx, `SELECT - distinct attname AS column_name, - format_type(atttypid, atttypmod) AS data_type, - (attnum = ANY(conkey)) AS is_primary_key + DISTINCT attname AS column_name, + format_type(atttypid, atttypmod) AS data_type, + (pg_constraint.contype = 'p') AS is_primary_key FROM pg_attribute JOIN pg_class ON pg_attribute.attrelid = pg_class.oid - JOIN pg_namespace on pg_class.relnamespace = pg_namespace.oid + JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid LEFT JOIN pg_constraint ON pg_attribute.attrelid = pg_constraint.conrelid AND pg_attribute.attnum = ANY(pg_constraint.conkey) + AND pg_constraint.contype = 'p' WHERE pg_namespace.nspname = $1 AND relname = $2 AND pg_attribute.attnum > 0 AND NOT attisdropped - ORDER BY column_name`, req.SchemaName, req.TableName) + ORDER BY column_name;`, req.SchemaName, req.TableName) if err != nil { return nil, err } diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index bf2809d10c..9db97288cc 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/pyroscope-go" "go.temporal.io/sdk/client" + temporalotel "go.temporal.io/sdk/contrib/opentelemetry" "go.temporal.io/sdk/worker" "github.com/PeerDB-io/peer-flow/activities" @@ -88,6 +89,15 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { Namespace: opts.TemporalNamespace, Logger: slog.New(shared.NewSlogHandler(slog.NewJSONHandler(os.Stdout, nil))), } + if opts.EnableOtelMetrics { + metricsProvider, metricsErr := otel_metrics.SetupTemporalMetricsProvider("flow-worker") + if metricsErr != nil { + return nil, metricsErr + } + clientOptions.MetricsHandler = temporalotel.NewMetricsHandler(temporalotel.MetricsHandlerOptions{ + Meter: metricsProvider.Meter("temporal-sdk-go"), + }) + } if peerdbenv.PeerDBTemporalEnableCertAuth() { slog.Info("Using temporal certificate/key for authentication") @@ -136,9 +146,9 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) { cleanupOtelManagerFunc := func() {} var otelManager *otel_metrics.OtelManager if opts.EnableOtelMetrics { - metricsProvider, metricErr := otel_metrics.SetupOtelMetricsExporter("flow-worker") - if metricErr != nil { - return nil, metricErr + metricsProvider, metricsErr := otel_metrics.SetupPeerDBMetricsProvider("flow-worker") + if metricsErr != nil { + return nil, metricsErr } otelManager = &otel_metrics.OtelManager{ MetricsProvider: metricsProvider, diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index c971c3f692..d65d61e9d7 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -77,6 +77,16 @@ func getColName(overrides map[string]string, name string) string { return name } +func getClickhouseTypeForNumericColumn(column *protos.FieldDescription) string { + rawPrecision, _ := datatypes.ParseNumericTypmod(column.TypeModifier) + if rawPrecision > datatypes.PeerDBClickHouseMaxPrecision { + return "String" + } else { + precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{}) + return fmt.Sprintf("Decimal(%d, %d)", precision, scale) + } +} + func generateCreateTableSQLForNormalizedTable( ctx context.Context, config *protos.SetupNormalizedTableBatchInput, @@ -129,8 +139,7 @@ func generateCreateTableSQLForNormalizedTable( if clickHouseType == "" { if colType == qvalue.QValueKindNumeric { - precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{}) - clickHouseType = fmt.Sprintf("Decimal(%d, %d)", precision, scale) + clickHouseType = getClickhouseTypeForNumericColumn(column) } else { var err error clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE) @@ -323,8 +332,7 @@ func (c *ClickHouseConnector) NormalizeRecords( colSelector.WriteString(fmt.Sprintf("`%s`,", dstColName)) if clickHouseType == "" { if colType == qvalue.QValueKindNumeric { - precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{}) - clickHouseType = fmt.Sprintf("Decimal(%d, %d)", precision, scale) + clickHouseType = getClickhouseTypeForNumericColumn(column) } else { var err error clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE) diff --git a/flow/connectors/clickhouse/qrep_avro_sync.go b/flow/connectors/clickhouse/qrep_avro_sync.go index fa7f03cf88..edbd0392c9 100644 --- a/flow/connectors/clickhouse/qrep_avro_sync.go +++ b/flow/connectors/clickhouse/qrep_avro_sync.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "strings" + "time" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -103,6 +104,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( ) (int, error) { dstTableName := config.DestinationTableIdentifier stagingPath := s.connector.credsProvider.BucketPath + startTime := time.Now() avroSchema, err := s.getAvroSchema(dstTableName, stream.Schema()) if err != nil { @@ -154,6 +156,11 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( return 0, err } + if err := s.connector.FinishQRepPartition(ctx, partition, config.FlowJobName, startTime); err != nil { + s.connector.logger.Error("Failed to finish QRep partition", slog.Any("error", err)) + return 0, err + } + return avroFile.NumRecords, nil } diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 91eaf3eba7..a355cfa00e 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -40,8 +40,9 @@ type PostgresCDCSource struct { childToParentRelIDMapping map[uint32]uint32 // for storing schema delta audit logs to catalog - catalogPool *pgxpool.Pool - flowJobName string + catalogPool *pgxpool.Pool + hushWarnUnhandledMessageType map[pglogrepl.MessageType]struct{} + flowJobName string } type PostgresCDCConfig struct { @@ -59,18 +60,19 @@ type PostgresCDCConfig struct { // Create a new PostgresCDCSource func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource { return &PostgresCDCSource{ - PostgresConnector: c, - srcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, - tableNameMapping: cdcConfig.TableNameMapping, - tableNameSchemaMapping: cdcConfig.TableNameSchemaMapping, - relationMessageMapping: cdcConfig.RelationMessageMapping, - slot: cdcConfig.Slot, - publication: cdcConfig.Publication, - childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, - typeMap: pgtype.NewMap(), - commitLock: nil, - catalogPool: cdcConfig.CatalogPool, - flowJobName: cdcConfig.FlowJobName, + PostgresConnector: c, + srcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, + tableNameMapping: cdcConfig.TableNameMapping, + tableNameSchemaMapping: cdcConfig.TableNameSchemaMapping, + relationMessageMapping: cdcConfig.RelationMessageMapping, + slot: cdcConfig.Slot, + publication: cdcConfig.Publication, + childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, + typeMap: pgtype.NewMap(), + commitLock: nil, + catalogPool: cdcConfig.CatalogPool, + flowJobName: cdcConfig.FlowJobName, + hushWarnUnhandledMessageType: make(map[pglogrepl.MessageType]struct{}), } } @@ -678,7 +680,10 @@ func processMessage[Items model.Items]( }, nil default: - logger.Debug(fmt.Sprintf("%T not supported", msg)) + if _, ok := p.hushWarnUnhandledMessageType[msg.Type()]; !ok { + logger.Warn(fmt.Sprintf("Unhandled message type: %T", msg)) + p.hushWarnUnhandledMessageType[msg.Type()] = struct{}{} + } } return nil, nil diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index ad7ca3951d..d0087d3beb 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -26,7 +26,8 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" - peerdb_gauges "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges" + "github.com/PeerDB-io/peer-flow/otel_metrics" + "github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges" "github.com/PeerDB-io/peer-flow/peerdbenv" "github.com/PeerDB-io/peer-flow/shared" ) @@ -1192,10 +1193,10 @@ func (c *PostgresConnector) HandleSlotInfo( slog.Float64("LagInMB", float64(slotInfo[0].LagInMb))) alerter.AlertIfSlotLag(ctx, alertKeys, slotInfo[0]) slotMetricGauges.SlotLagGauge.Set(float64(slotInfo[0].LagInMb), attribute.NewSet( - attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName), - attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName), - attribute.String(peerdb_gauges.SlotNameKey, alertKeys.SlotName), - attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.SlotNameKey, alertKeys.SlotName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) // Also handles alerts for PeerDB user connections exceeding a given limit here res, err := getOpenConnectionsForUser(ctx, c.conn, c.config.User) @@ -1205,9 +1206,9 @@ func (c *PostgresConnector) HandleSlotInfo( } alerter.AlertIfOpenConnections(ctx, alertKeys, res) slotMetricGauges.OpenConnectionsGauge.Set(res.CurrentOpenConnections, attribute.NewSet( - attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName), - attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName), - attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) replicationRes, err := getOpenReplicationConnectionsForUser(ctx, c.conn, c.config.User) if err != nil { @@ -1216,9 +1217,9 @@ func (c *PostgresConnector) HandleSlotInfo( } slotMetricGauges.OpenReplicationConnectionsGauge.Set(replicationRes.CurrentOpenConnections, attribute.NewSet( - attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName), - attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName), - attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) var intervalSinceLastNormalize *time.Duration err = alerter.CatalogPool.QueryRow(ctx, "SELECT now()-max(end_time) FROM peerdb_stats.cdc_batches WHERE flow_name=$1", @@ -1233,9 +1234,9 @@ func (c *PostgresConnector) HandleSlotInfo( } if intervalSinceLastNormalize != nil { slotMetricGauges.IntervalSinceLastNormalizeGauge.Set(intervalSinceLastNormalize.Seconds(), attribute.NewSet( - attribute.String(peerdb_gauges.FlowNameKey, alertKeys.FlowName), - attribute.String(peerdb_gauges.PeerNameKey, alertKeys.PeerName), - attribute.String(peerdb_gauges.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) + attribute.String(otel_metrics.FlowNameKey, alertKeys.FlowName), + attribute.String(otel_metrics.PeerNameKey, alertKeys.PeerName), + attribute.String(otel_metrics.DeploymentUidKey, peerdbenv.PeerDBDeploymentUID()))) alerter.AlertIfTooLongSinceLastNormalize(ctx, alertKeys, *intervalSinceLastNormalize) } diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 9ce4064ed6..6f193be88b 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -222,8 +222,8 @@ func (p *peerDBOCFWriter) WriteRecordsToS3( }) if err != nil { s3Path := "s3://" + bucketName + "/" + key - logger.Error("failed to upload file", slog.Any("error", err), slog.Any("s3_path", s3Path)) - return nil, fmt.Errorf("failed to upload file to path %s: %w", s3Path, err) + logger.Error("failed to upload file", slog.Any("error", err), slog.String("s3_path", s3Path)) + return nil, fmt.Errorf("failed to upload file: %w", err) } if writeOcfError != nil { diff --git a/flow/datatypes/numeric.go b/flow/datatypes/numeric.go index 6c57071484..56c1b17839 100644 --- a/flow/datatypes/numeric.go +++ b/flow/datatypes/numeric.go @@ -5,7 +5,9 @@ const ( PeerDBBigQueryScale = 20 PeerDBSnowflakeScale = 20 PeerDBClickHouseScale = 38 - VARHDRSZ = 4 + + PeerDBClickHouseMaxPrecision = 76 + VARHDRSZ = 4 ) type WarehouseNumericCompatibility interface { @@ -17,7 +19,7 @@ type WarehouseNumericCompatibility interface { type ClickHouseNumericCompatibility struct{} func (ClickHouseNumericCompatibility) MaxPrecision() int16 { - return 76 + return PeerDBClickHouseMaxPrecision } func (ClickHouseNumericCompatibility) MaxScale() int16 { diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index 891fe55365..e1eafd6b4b 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -2,7 +2,6 @@ package e2e_clickhouse import ( "context" - "errors" "fmt" "reflect" "strings" @@ -93,13 +92,9 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch return nil, err } - firstCol, _, _ := strings.Cut(cols, ",") - if firstCol == "" { - return nil, errors.New("no columns specified") - } rows, err := ch.Query( context.Background(), - fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY %s SETTINGS use_query_cache = false`, cols, table, firstCol), + fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY 1 SETTINGS use_query_cache = false`, cols, table), ) if err != nil { return nil, err diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 4dcee7feb5..3cf1f97597 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -4,15 +4,18 @@ import ( "context" "embed" "fmt" + "strings" "testing" "time" + "github.com/shopspring/decimal" "github.com/stretchr/testify/require" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) @@ -499,3 +502,59 @@ func (s ClickHouseSuite) Test_Weird_Table_And_Column() { env.Cancel() e2e.RequireEnvCanceled(s.t, env) } + +// large NUMERICs (precision >76) are mapped to String on CH, test +func (s ClickHouseSuite) Test_Large_Numeric() { + srcFullName := s.attachSchemaSuffix("lnumeric") + dstTableName := "lnumeric" + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s( + id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + c1 NUMERIC(76,0), + c2 NUMERIC(78,0) + ); + `, srcFullName)) + require.NoError(s.t, err) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2) VALUES(%s,%s);`, srcFullName, strings.Repeat("7", 76), strings.Repeat("9", 78))) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("clickhouse_test_large_numerics"), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + flowConnConfig.DoInitialSnapshot = true + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c1,c2", 1) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2) VALUES(%s,%s);`, srcFullName, strings.Repeat("7", 76), strings.Repeat("9", 78))) + require.NoError(s.t, err) + + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c1,c2", 2) + + rows, err := s.GetRows(dstTableName, "c1,c2") + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 2, "expected 2 rows") + for _, row := range rows.Records { + require.Len(s.t, row, 2, "expected 2 columns") + require.Equal(s.t, qvalue.QValueKindNumeric, row[0].Kind(), "expected NUMERIC(76,0) to be Decimal") + require.Equal(s.t, qvalue.QValueKindString, row[1].Kind(), "expected NUMERIC(78,0) to be String") + c1, ok := row[0].Value().(decimal.Decimal) + require.True(s.t, ok, "expected NUMERIC(76,0) to be Decimal") + require.Equal(s.t, strings.Repeat("7", 76), c1.String(), "expected NUMERIC(76,0) to be 7s") + c2, ok := row[1].Value().(string) + require.True(s.t, ok, "expected NUMERIC(78,0) to be String") + require.Equal(s.t, strings.Repeat("9", 78), c2, "expected NUMERIC(78,0) to be 9s") + } + + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 2eca0520c9..9dadc49852 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -168,6 +168,30 @@ func EnvWaitForEqualTablesWithNames( }) } +func EnvWaitForCount( + env WorkflowRun, + suite RowSource, + reason string, + dstTable string, + cols string, + expectedCount int, +) { + t := suite.T() + t.Helper() + + EnvWaitFor(t, env, 3*time.Minute, reason, func() bool { + t.Helper() + + rows, err := suite.GetRows(dstTable, cols) + if err != nil { + t.Log(err) + return false + } + + return len(rows.Records) == expectedCount + }) +} + func RequireEnvCanceled(t *testing.T, env WorkflowRun) { t.Helper() EnvWaitForFinished(t, env, time.Minute) diff --git a/flow/go.mod b/flow/go.mod index e6c76516d7..e24ffa9fb0 100644 --- a/flow/go.mod +++ b/flow/go.mod @@ -56,11 +56,16 @@ require ( go.opentelemetry.io/otel v1.31.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 go.opentelemetry.io/otel/metric v1.31.0 go.opentelemetry.io/otel/sdk v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 + go.opentelemetry.io/otel/trace v1.31.0 go.temporal.io/api v1.40.0 go.temporal.io/sdk v1.29.1 + go.temporal.io/sdk/contrib/opentelemetry v0.6.0 go.uber.org/automaxprocs v1.6.0 golang.org/x/crypto v0.28.0 golang.org/x/sync v0.8.0 @@ -139,7 +144,6 @@ require ( go.opentelemetry.io/contrib/detectors/gcp v1.31.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect - go.opentelemetry.io/otel/trace v1.31.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/term v0.25.0 // indirect diff --git a/flow/go.sum b/flow/go.sum index b7992ec679..8f783af565 100644 --- a/flow/go.sum +++ b/flow/go.sum @@ -490,6 +490,12 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0 h1:FZ6 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0/go.mod h1:MdEu/mC6j3D+tTEfvI15b5Ci2Fn7NneJ71YMoiS3tpI= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0 h1:ZsXq73BERAiNuuFXYqP4MR5hBrjXfMGSO+Cx7qoOZiM= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0/go.mod h1:hg1zaDMpyZJuUzjFxFsRYBoccE86tM9Uf4IqNMUxvrY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 h1:K0XaT3DwHAcV4nKLzcQvwAgSyisUghWoY20I7huthMk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0/go.mod h1:B5Ki776z/MBnVha1Nzwp5arlzBbE3+1jk+pGmaP5HME= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 h1:FFeLy03iVTXP6ffeN2iXrxfGsZGCjVx0/4KlizjyBwU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0/go.mod h1:TMu73/k1CP8nBUpDLc71Wj/Kf7ZS9FK5b53VapRsP9o= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= @@ -504,10 +510,14 @@ go.temporal.io/api v1.40.0 h1:rH3HvUUCFr0oecQTBW5tI6DdDQsX2Xb6OFVgt/bvLto= go.temporal.io/api v1.40.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis= go.temporal.io/sdk v1.29.1 h1:y+sUMbUhTU9rj50mwIZAPmcXCtgUdOWS9xHDYRYSgZ0= go.temporal.io/sdk v1.29.1/go.mod h1:kp//DRvn3CqQVBCtjL51Oicp9wrZYB2s6row1UgzcKQ= +go.temporal.io/sdk/contrib/opentelemetry v0.6.0 h1:rNBArDj5iTUkcMwKocUShoAW59o6HdS7Nq4CTp4ldj8= +go.temporal.io/sdk/contrib/opentelemetry v0.6.0/go.mod h1:Lem8VrE2ks8P+FYcRM3UphPoBr+tfM3v/Kaf0qStzSg= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/flow/middleware/logging.go b/flow/middleware/logging.go new file mode 100644 index 0000000000..51932700fe --- /dev/null +++ b/flow/middleware/logging.go @@ -0,0 +1,31 @@ +package middleware + +import ( + "context" + "log/slog" + + "google.golang.org/grpc" + + "github.com/PeerDB-io/peer-flow/peerdbenv" +) + +func RequestLoggingMiddleWare() grpc.UnaryServerInterceptor { + if !peerdbenv.PeerDBRAPIRequestLoggingEnabled() { + slog.Info("Request Logging Interceptor is disabled") + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + return handler(ctx, req) + } + } + slog.Info("Setting up request logging middleware") + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + slog.Info("Received gRPC request", slog.String("method", info.FullMethod)) + + resp, err := handler(ctx, req) + if err != nil { + slog.Error("gRPC request failed", slog.String("method", info.FullMethod), slog.Any("error", err)) + } else { + slog.Info("gRPC request completed successfully", slog.String("method", info.FullMethod)) + } + return resp, err + } +} diff --git a/flow/auth/middleware.go b/flow/middleware/oauth.go similarity index 81% rename from flow/auth/middleware.go rename to flow/middleware/oauth.go index bb3ee34da5..52bbc03672 100644 --- a/flow/auth/middleware.go +++ b/flow/middleware/oauth.go @@ -1,4 +1,4 @@ -package auth +package middleware import ( "context" @@ -34,7 +34,7 @@ type identityProvider struct { issuer string } -func AuthGrpcMiddleware(unauthenticatedMethods []string) ([]grpc.ServerOption, error) { +func AuthGrpcMiddleware(unauthenticatedMethods []string) (grpc.UnaryServerInterceptor, error) { oauthConfig := peerdbenv.GetPeerDBOAuthConfig() oauthJwtClaims := map[string]string{} if oauthConfig.OAuthJwtClaimKey != "" { @@ -57,7 +57,9 @@ func AuthGrpcMiddleware(unauthenticatedMethods []string) ([]grpc.ServerOption, e slog.Warn("authentication is disabled") - return nil, nil + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + return handler(ctx, req) + }, nil } if err != nil { @@ -68,36 +70,24 @@ func AuthGrpcMiddleware(unauthenticatedMethods []string) ([]grpc.ServerOption, e for _, method := range unauthenticatedMethods { unauthenticatedMethodsMap[method] = struct{}{} } - return []grpc.ServerOption{ - grpc.ChainUnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - slog.Info("Received gRPC request", slog.String("method", info.FullMethod)) - - if _, unauthorized := unauthenticatedMethodsMap[info.FullMethod]; !unauthorized { - var authHeader string - authHeaders := metadata.ValueFromIncomingContext(ctx, "Authorization") - if len(authHeaders) == 1 { - authHeader = authHeaders[0] - } else if len(authHeaders) > 1 { - slog.Warn("Multiple Authorization headers supplied, request rejected", slog.String("method", info.FullMethod)) - return nil, status.Errorf(codes.Unauthenticated, "multiple Authorization headers supplied, request rejected") - } - _, err := validateRequestToken(authHeader, cfg.OauthJwtCustomClaims, ip...) - if err != nil { - slog.Debug("Failed to validate request token", slog.String("method", info.FullMethod), slog.Any("error", err)) - return nil, status.Errorf(codes.Unauthenticated, "%s", err.Error()) - } + return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + if _, unauthorized := unauthenticatedMethodsMap[info.FullMethod]; !unauthorized { + var authHeader string + authHeaders := metadata.ValueFromIncomingContext(ctx, "Authorization") + if len(authHeaders) == 1 { + authHeader = authHeaders[0] + } else if len(authHeaders) > 1 { + slog.Warn("Multiple Authorization headers supplied, request rejected", slog.String("method", info.FullMethod)) + return nil, status.Errorf(codes.Unauthenticated, "multiple Authorization headers supplied, request rejected") } - - resp, err := handler(ctx, req) - + _, err := validateRequestToken(authHeader, cfg.OauthJwtCustomClaims, ip...) if err != nil { - slog.Error("gRPC request failed", slog.String("method", info.FullMethod), slog.Any("error", err)) - } else { - slog.Info("gRPC request completed successfully", slog.String("method", info.FullMethod)) + slog.Debug("Failed to validate request token", slog.String("method", info.FullMethod), slog.Any("error", err)) + return nil, status.Errorf(codes.Unauthenticated, "%s", err.Error()) } + } - return resp, err - }), + return handler(ctx, req) }, nil } diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index c2382e5b0c..9738f46e8f 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -103,6 +103,10 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci } return "bytes", nil case QValueKindNumeric: + if targetDWH == protos.DBType_CLICKHOUSE && + precision > datatypes.PeerDBClickHouseMaxPrecision { + return "string", nil + } avroNumericPrecision, avroNumericScale := DetermineNumericSettingForDWH(precision, scale, targetDWH) return AvroSchemaNumeric{ Type: "bytes", @@ -454,6 +458,12 @@ func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} { return nil } + if c.TargetDWH == protos.DBType_CLICKHOUSE && + c.Precision > datatypes.PeerDBClickHouseMaxPrecision { + // no error returned + numStr, _ := c.processNullableUnion("string", num.String()) + return numStr + } rat := num.Rat() if c.Nullable { return goavro.Union("bytes.decimal", rat) diff --git a/flow/otel_metrics/peerdb_gauges/attributes.go b/flow/otel_metrics/attributes.go similarity index 88% rename from flow/otel_metrics/peerdb_gauges/attributes.go rename to flow/otel_metrics/attributes.go index 78b54b6119..bd17cfeeb2 100644 --- a/flow/otel_metrics/peerdb_gauges/attributes.go +++ b/flow/otel_metrics/attributes.go @@ -1,4 +1,4 @@ -package peerdb_gauges +package otel_metrics const ( PeerNameKey string = "peerName" diff --git a/flow/otel_metrics/env.go b/flow/otel_metrics/env.go new file mode 100644 index 0000000000..81b5d0c3ea --- /dev/null +++ b/flow/otel_metrics/env.go @@ -0,0 +1,11 @@ +package otel_metrics + +import "github.com/PeerDB-io/peer-flow/peerdbenv" + +func GetPeerDBOtelMetricsNamespace() string { + return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "") +} + +func GetPeerDBOtelTemporalMetricsExportListEnv() string { + return peerdbenv.GetEnvString("PEERDB_OTEL_TEMPORAL_METRICS_EXPORT_LIST", "") +} diff --git a/flow/otel_metrics/otel_manager.go b/flow/otel_metrics/otel_manager.go index 112124d203..becf13a16f 100644 --- a/flow/otel_metrics/otel_manager.go +++ b/flow/otel_metrics/otel_manager.go @@ -3,7 +3,10 @@ package otel_metrics import ( "context" "fmt" + "log/slog" + "strings" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" "go.opentelemetry.io/otel/metric" @@ -22,12 +25,16 @@ type OtelManager struct { } // newOtelResource returns a resource describing this application. -func newOtelResource(otelServiceName string) (*resource.Resource, error) { +func newOtelResource(otelServiceName string, attrs ...attribute.KeyValue) (*resource.Resource, error) { + allAttrs := []attribute.KeyValue{ + semconv.ServiceNameKey.String(otelServiceName), + } + allAttrs = append(allAttrs, attrs...) r, err := resource.Merge( resource.Default(), resource.NewWithAttributes( semconv.SchemaURL, - semconv.ServiceNameKey.String(otelServiceName), + allAttrs..., ), ) @@ -42,7 +49,53 @@ func setupGrpcOtelMetricsExporter() (sdkmetric.Exporter, error) { return otlpmetricgrpc.New(context.Background()) } -func SetupOtelMetricsExporter(otelServiceName string) (*sdkmetric.MeterProvider, error) { +func temporalMetricsFilteringView() sdkmetric.View { + exportListString := GetPeerDBOtelTemporalMetricsExportListEnv() + slog.Info("Found export list for temporal metrics", slog.String("exportList", exportListString)) + // Special case for exporting all metrics + if exportListString == "__ALL__" { + return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { + stream := sdkmetric.Stream{ + Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Description: instrument.Description, + Unit: instrument.Unit, + } + return stream, true + } + } + exportList := strings.Split(exportListString, ",") + // Don't export any metrics if the list is empty + if len(exportList) == 0 { + return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { + return sdkmetric.Stream{ + Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Description: instrument.Description, + Unit: instrument.Unit, + Aggregation: sdkmetric.AggregationDrop{}, + }, true + } + } + + // Export only the metrics in the list + enabledMetrics := make(map[string]struct{}, len(exportList)) + for _, metricName := range exportList { + trimmedMetricName := strings.TrimSpace(metricName) + enabledMetrics[trimmedMetricName] = struct{}{} + } + return func(instrument sdkmetric.Instrument) (sdkmetric.Stream, bool) { + stream := sdkmetric.Stream{ + Name: GetPeerDBOtelMetricsNamespace() + "temporal." + instrument.Name, + Description: instrument.Description, + Unit: instrument.Unit, + } + if _, ok := enabledMetrics[instrument.Name]; !ok { + stream.Aggregation = sdkmetric.AggregationDrop{} + } + return stream, true + } +} + +func setupExporter() (sdkmetric.Exporter, error) { otlpMetricProtocol := peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_PROTOCOL", peerdbenv.GetEnvString("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL", "http/protobuf")) var metricExporter sdkmetric.Exporter @@ -58,14 +111,35 @@ func SetupOtelMetricsExporter(otelServiceName string) (*sdkmetric.MeterProvider, if err != nil { return nil, fmt.Errorf("failed to create OpenTelemetry metrics exporter: %w", err) } - otelResource, err := newOtelResource(otelServiceName) + return metricExporter, err +} + +func setupMetricsProvider(otelResource *resource.Resource, views ...sdkmetric.View) (*sdkmetric.MeterProvider, error) { + metricExporter, err := setupExporter() if err != nil { - return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + return nil, err } meterProvider := sdkmetric.NewMeterProvider( sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter)), sdkmetric.WithResource(otelResource), + sdkmetric.WithView(views...), ) return meterProvider, nil } + +func SetupPeerDBMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvider, error) { + otelResource, err := newOtelResource(otelServiceName) + if err != nil { + return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + } + return setupMetricsProvider(otelResource) +} + +func SetupTemporalMetricsProvider(otelServiceName string) (*sdkmetric.MeterProvider, error) { + otelResource, err := newOtelResource(otelServiceName, attribute.String(DeploymentUidKey, peerdbenv.PeerDBDeploymentUID())) + if err != nil { + return nil, fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + } + return setupMetricsProvider(otelResource, temporalMetricsFilteringView()) +} diff --git a/flow/otel_metrics/peerdb_gauges/gauges.go b/flow/otel_metrics/peerdb_gauges/gauges.go index 6f8f4f0c54..767aac0945 100644 --- a/flow/otel_metrics/peerdb_gauges/gauges.go +++ b/flow/otel_metrics/peerdb_gauges/gauges.go @@ -2,7 +2,6 @@ package peerdb_gauges import ( "github.com/PeerDB-io/peer-flow/otel_metrics" - "github.com/PeerDB-io/peer-flow/peerdbenv" ) const ( @@ -20,5 +19,5 @@ type SlotMetricGauges struct { } func BuildGaugeName(baseGaugeName string) string { - return peerdbenv.GetEnvString("PEERDB_OTEL_METRICS_NAMESPACE", "") + baseGaugeName + return otel_metrics.GetPeerDBOtelMetricsNamespace() + baseGaugeName } diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index ecae67037f..e033b87195 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log/slog" + "strconv" "strings" "time" @@ -156,3 +157,12 @@ func PeerDBGetIncidentIoUrl() string { func PeerDBGetIncidentIoToken() string { return GetEnvString("PEERDB_INCIDENTIO_TOKEN", "") } + +func PeerDBRAPIRequestLoggingEnabled() bool { + requestLoggingEnabled, err := strconv.ParseBool(GetEnvString("PEERDB_API_REQUEST_LOGGING_ENABLED", "false")) + if err != nil { + slog.Error("failed to parse PEERDB_API_REQUEST_LOGGING_ENABLED to bool", "error", err) + return false + } + return requestLoggingEnabled +} diff --git a/flow/peerdbenv/oauth.go b/flow/peerdbenv/oauth.go index cd76b30193..54b2f04425 100644 --- a/flow/peerdbenv/oauth.go +++ b/flow/peerdbenv/oauth.go @@ -1,6 +1,9 @@ package peerdbenv -import "strconv" +import ( + "log/slog" + "strconv" +) type PeerDBOAuthConfig struct { // there can be more complex use cases where domain != issuer, but we handle them later if required @@ -18,6 +21,7 @@ func GetPeerDBOAuthConfig() PeerDBOAuthConfig { oauthDiscoveryEnabledString := GetEnvString("PEERDB_OAUTH_DISCOVERY_ENABLED", "false") oauthDiscoveryEnabled, err := strconv.ParseBool(oauthDiscoveryEnabledString) if err != nil { + slog.Error("failed to parse PEERDB_OAUTH_DISCOVERY_ENABLED to bool", "error", err) oauthDiscoveryEnabled = false } oauthKeysetJson := GetEnvString("PEERDB_OAUTH_KEYSET_JSON", "") diff --git a/flow/workflows/drop_flow.go b/flow/workflows/drop_flow.go index fe822dd66f..51bf0091a1 100644 --- a/flow/workflows/drop_flow.go +++ b/flow/workflows/drop_flow.go @@ -12,35 +12,27 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error { - workflow.GetLogger(ctx).Info("performing cleanup for flow", slog.String(string(shared.FlowNameKey), config.FlowJobName)) - +func executeCDCDropActivities(ctx workflow.Context, input *protos.DropFlowInput) error { ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) - ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) ctx = workflow.WithDataConverter(ctx, converter.NewCompositeDataConverter(converter.NewJSONPayloadConverter())) - dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 5 * time.Minute, - HeartbeatTimeout: 1 * time.Minute, - }) - var sourceError, destinationError error var sourceOk, destinationOk, canceled bool - selector := workflow.NewNamedSelector(ctx, config.FlowJobName+"-drop") + selector := workflow.NewNamedSelector(ctx, input.FlowJobName+"-drop") selector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { canceled = true }) - var dropSource, dropDestination, dropStats func(f workflow.Future) + var dropSource, dropDestination func(f workflow.Future) dropSource = func(f workflow.Future) { sourceError = f.Get(ctx, nil) sourceOk = sourceError == nil if !sourceOk { dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, &protos.DropFlowActivityInput{ - FlowJobName: config.FlowJobName, - PeerName: config.SourcePeerName, + FlowJobName: input.FlowJobName, + PeerName: input.FlowConnectionConfigs.SourceName, }) selector.AddFuture(dropSourceFuture, dropSource) _ = workflow.Sleep(ctx, time.Second) @@ -51,34 +43,25 @@ func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error destinationOk = destinationError == nil if !destinationOk { dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, &protos.DropFlowActivityInput{ - FlowJobName: config.FlowJobName, - PeerName: config.DestinationPeerName, + FlowJobName: input.FlowJobName, + PeerName: input.FlowConnectionConfigs.DestinationName, }) selector.AddFuture(dropDestinationFuture, dropDestination) _ = workflow.Sleep(ctx, time.Second) } } - dropStats = func(f workflow.Future) { - statsError := f.Get(dropStatsCtx, nil) - if statsError != nil { - // not fatal - workflow.GetLogger(ctx).Warn("failed to delete mirror stats", slog.Any("error", statsError)) - } - } + dropSourceFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowSource, &protos.DropFlowActivityInput{ - FlowJobName: config.FlowJobName, - PeerName: config.SourcePeerName, + FlowJobName: input.FlowJobName, + PeerName: input.FlowConnectionConfigs.SourceName, }) selector.AddFuture(dropSourceFuture, dropSource) dropDestinationFuture := workflow.ExecuteActivity(ctx, flowable.DropFlowDestination, &protos.DropFlowActivityInput{ - FlowJobName: config.FlowJobName, - PeerName: config.DestinationPeerName, + FlowJobName: input.FlowJobName, + PeerName: input.FlowConnectionConfigs.DestinationName, }) + selector.AddFuture(dropDestinationFuture, dropDestination) - if config.DropFlowStats { - dropStatsFuture := workflow.ExecuteActivity(dropStatsCtx, flowable.DeleteMirrorStats, config.FlowJobName) - selector.AddFuture(dropStatsFuture, dropStats) - } for { selector.Select(ctx) @@ -89,3 +72,45 @@ func DropFlowWorkflow(ctx workflow.Context, config *protos.DropFlowInput) error } } } + +func DropFlowWorkflow(ctx workflow.Context, input *protos.DropFlowInput) error { + ctx = workflow.WithValue(ctx, shared.FlowNameKey, input.FlowJobName) + workflow.GetLogger(ctx).Info("performing cleanup for flow", + slog.String(string(shared.FlowNameKey), input.FlowJobName)) + + if input.FlowConnectionConfigs != nil && input.DropFlowStats { + dropStatsCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Minute, + HeartbeatTimeout: 1 * time.Minute, + }) + dropStatsFuture := workflow.ExecuteActivity(dropStatsCtx, + flowable.DeleteMirrorStats, input.FlowJobName) + err := dropStatsFuture.Get(dropStatsCtx, nil) + if err != nil { + workflow.GetLogger(ctx).Error("failed to delete mirror stats", slog.Any("error", err)) + return err + } + } + + removeFlowEntriesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Minute, + }) + removeFromCatalogFuture := workflow.ExecuteActivity(removeFlowEntriesCtx, + flowable.RemoveFlowEntryFromCatalog, input.FlowJobName) + err := removeFromCatalogFuture.Get(ctx, nil) + if err != nil { + workflow.GetLogger(ctx).Error("failed to remove flow entries from catalog", slog.Any("error", err)) + return err + } + + if input.FlowConnectionConfigs != nil { + err := executeCDCDropActivities(ctx, input) + if err != nil { + workflow.GetLogger(ctx).Error("failed to drop CDC flow", slog.Any("error", err)) + return err + } + workflow.GetLogger(ctx).Info("CDC flow dropped successfully") + } + + return nil +} diff --git a/protos/flow.proto b/protos/flow.proto index 7e24cfc528..d1681fd8d5 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -333,10 +333,10 @@ message QRepParitionResult { } message DropFlowInput { + reserved 2,3; string flow_job_name = 1; - string source_peer_name = 2; - string destination_peer_name = 3; bool drop_flow_stats = 4; + FlowConnectionConfigs flow_connection_configs = 5; } message TableSchemaDelta { diff --git a/protos/route.proto b/protos/route.proto index 9b85da6f47..0265f221ee 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -145,6 +145,7 @@ message CreatePeerResponse { message MirrorStatusRequest { string flow_job_name = 1; bool include_flow_info = 2; + bool exclude_batches = 3; } message PartitionStatus { @@ -320,6 +321,7 @@ message CDCMirrorStatus { repeated CDCBatch cdc_batches = 3; peerdb_peers.DBType source_type = 4; peerdb_peers.DBType destination_type = 5; + int64 rows_synced = 6; } message MirrorStatusResponse { @@ -343,10 +345,29 @@ message InitialLoadSummaryResponse { message GetCDCBatchesRequest { string flow_job_name = 1; uint32 limit = 2; + bool ascending = 3; + int64 before_id = 4; + int64 after_id = 5; } message GetCDCBatchesResponse { repeated CDCBatch cdc_batches = 1; + int32 total = 2; + int32 page = 3; +} + +message GraphRequest { + string flow_job_name = 1; + string aggregate_type = 2; // TODO name? +} + +message GraphResponseItem { + double time = 1; + double rows = 2; +} + +message GraphResponse { + repeated GraphResponseItem data = 1; } message MirrorLog { @@ -354,16 +375,20 @@ message MirrorLog { string error_message = 2; string error_type = 3; double error_timestamp = 4; + int32 id = 5; } message ListMirrorLogsRequest { string flow_job_name = 1; string level = 2; int32 page = 3; int32 num_per_page = 4; + int32 before_id = 5; + int32 after_id = 6; } message ListMirrorLogsResponse { repeated MirrorLog errors = 1; int32 total = 2; + int32 page = 3; } message ValidateCDCMirrorResponse{ @@ -541,11 +566,19 @@ service FlowService { } rpc GetCDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) { - option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}"}; + option (google.api.http) = { get: "/v1/mirrors/cdc/batches/{flow_job_name}" }; + } + + rpc CDCBatches(GetCDCBatchesRequest) returns (GetCDCBatchesResponse) { + option (google.api.http) = { post: "/v1/mirrors/cdc/batches", body: "*" }; + } + + rpc CDCGraph(GraphRequest) returns (GraphResponse) { + option (google.api.http) = { post: "/v1/mirrors/cdc/graph", body: "*" }; } rpc InitialLoadSummary(InitialLoadSummaryRequest) returns (InitialLoadSummaryResponse) { - option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}"}; + option (google.api.http) = { get: "/v1/mirrors/cdc/initial_load/{parent_mirror_name}" }; } rpc GetPeerInfo(PeerInfoRequest) returns (PeerInfoResponse) { diff --git a/ui/app/mirror-logs/table.tsx b/ui/app/mirror-logs/table.tsx index fc9206a362..4d14c80826 100644 --- a/ui/app/mirror-logs/table.tsx +++ b/ui/app/mirror-logs/table.tsx @@ -1,14 +1,8 @@ 'use client'; import LogsTable from '@/components/LogsTable'; -import { - ListMirrorLogsRequest, - ListMirrorLogsResponse, - ListMirrorNamesResponse, - MirrorLog, -} from '@/grpc_generated/route'; +import { ListMirrorNamesResponse } from '@/grpc_generated/route'; import { ProgressCircle } from '@/lib/ProgressCircle'; -import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; import 'react-toastify/dist/ReactToastify.css'; import useSWR from 'swr'; @@ -16,7 +10,6 @@ import { useLocalStorage } from 'usehooks-ts'; import { fetcher } from '../utils/swr'; export default function LogsView() { - const [logs, setLogs] = useState([]); const [mirrorName, setMirrorName] = useLocalStorage( 'peerdbMirrorNameFilterForLogs', '' @@ -25,45 +18,9 @@ export default function LogsView() { 'peerdbLogTypeFilterForLogs', 'all' ); - const [currentPage, setCurrentPage] = useState(1); - const [totalPages, setTotalPages] = useState(1); const { data: mirrors }: { data: ListMirrorNamesResponse; error: any } = useSWR('/api/v1/mirrors/names', fetcher); - useEffect(() => { - setCurrentPage(1); - }, [mirrorName]); - - useEffect(() => { - const req: ListMirrorLogsRequest = { - level: logLevel, - flowJobName: mirrorName, - page: currentPage, - numPerPage: 15, - }; - - const fetchData = async () => { - try { - const response = await fetch('/api/v1/mirrors/logs', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - cache: 'no-store', - body: JSON.stringify(req), - }); - const data: ListMirrorLogsResponse = await response.json(); - const numPages = Math.ceil(data.total / req.numPerPage); - setLogs(data.errors); - setTotalPages(numPages); - } catch (error) { - console.error('Error fetching mirror logs:', error); - } - }; - - fetchData(); - }, [currentPage, mirrorName, logLevel]); - if (!mirrors) { return ; } @@ -107,12 +64,7 @@ export default function LogsView() { /> - + ); } diff --git a/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts b/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts index 42b74e6198..92b2f6cb98 100644 --- a/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts +++ b/ui/app/mirrors/[mirrorId]/aggregatedCountsByInterval.ts @@ -7,7 +7,7 @@ type timestampType = { count: number; }; -function aggregateCountsByInterval( +export default function aggregateCountsByInterval( timestamps: timestampType[], interval: TimeAggregateTypes ): [string, number][] { @@ -83,5 +83,3 @@ function aggregateCountsByInterval( return resultArray; } - -export default aggregateCountsByInterval; diff --git a/ui/app/mirrors/[mirrorId]/cdc.tsx b/ui/app/mirrors/[mirrorId]/cdc.tsx index e404749b01..34556379b1 100644 --- a/ui/app/mirrors/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/[mirrorId]/cdc.tsx @@ -1,5 +1,5 @@ 'use client'; -import { CDCBatch, MirrorStatusResponse } from '@/grpc_generated/route'; +import { MirrorStatusResponse } from '@/grpc_generated/route'; import { Label } from '@/lib/Label'; import { ProgressCircle } from '@/lib/ProgressCircle'; import { Tab, TabGroup, TabList, TabPanel, TabPanels } from '@tremor/react'; @@ -10,14 +10,9 @@ import { SnapshotStatusTable } from './snapshot'; type CDCMirrorStatusProps = { status: MirrorStatusResponse; - rows: CDCBatch[]; syncStatusChild?: React.ReactNode; }; -export function CDCMirror({ - status, - rows, - syncStatusChild, -}: CDCMirrorStatusProps) { +export function CDCMirror({ status, syncStatusChild }: CDCMirrorStatusProps) { const LocalStorageTabKey = 'cdctab'; const [selectedTab, setSelectedTab] = useLocalStorage(LocalStorageTabKey, 0); const [mounted, setMounted] = useState(false); @@ -60,7 +55,6 @@ export function CDCMirror({ (); - let rowsSynced = syncs.reduce((acc, sync) => { - if (sync.endTime !== null) { - return acc + Number(sync.numRows); - } - return acc; - }, 0); +export default function CdcDetails({ + createdAt, + mirrorConfig, + mirrorStatus, +}: props) { + const [syncInterval, setSyncInterval] = useState(); const tablesSynced = mirrorConfig.config?.tableMappings; useEffect(() => { - getCurrentIdleTimeout(mirrorConfig.config?.flowJobName ?? '').then( - (res) => { - getSyncInterval(res); - } + getCurrentIdleTimeout(mirrorConfig.config?.flowJobName ?? '').then((res) => + setSyncInterval(res) ); }, [mirrorConfig.config?.flowJobName]); return ( @@ -82,8 +77,8 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
@@ -95,8 +90,8 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
@@ -129,7 +124,9 @@ function CdcDetails({ syncs, createdAt, mirrorConfig, mirrorStatus }: props) {
- +
@@ -151,8 +148,7 @@ const SyncIntervalLabel: React.FC<{ syncInterval?: number }> = ({ if (!syncInterval) { return ; - } - if (syncInterval >= 3600) { + } else if (syncInterval >= 3600) { const hours = Math.floor(syncInterval / 3600); formattedInterval = `${hours} hour${hours !== 1 ? 's' : ''}`; } else if (syncInterval >= 60) { @@ -164,5 +160,3 @@ const SyncIntervalLabel: React.FC<{ syncInterval?: number }> = ({ return ; }; - -export default CdcDetails; diff --git a/ui/app/mirrors/[mirrorId]/cdcGraph.tsx b/ui/app/mirrors/[mirrorId]/cdcGraph.tsx index 02e2c0d26a..e022101daa 100644 --- a/ui/app/mirrors/[mirrorId]/cdcGraph.tsx +++ b/ui/app/mirrors/[mirrorId]/cdcGraph.tsx @@ -5,35 +5,44 @@ import { TimeAggregateTypes, timeOptions, } from '@/app/utils/graph'; -import { CDCBatch } from '@/grpc_generated/route'; import { Label } from '@/lib/Label'; import { BarChart } from '@tremor/react'; -import { useMemo, useState } from 'react'; +import { useEffect, useState } from 'react'; import ReactSelect from 'react-select'; -import aggregateCountsByInterval from './aggregatedCountsByInterval'; -type CdcGraphProps = { - syncs: CDCBatch[]; -}; +type CdcGraphProps = { mirrorName: string }; -function CdcGraph({ syncs }: CdcGraphProps) { - let [aggregateType, setAggregateType] = useState( +export default function CdcGraph({ mirrorName }: CdcGraphProps) { + const [aggregateType, setAggregateType] = useState( TimeAggregateTypes.HOUR ); + const [graphValues, setGraphValues] = useState< + { name: string; 'Rows synced at a point in time': number }[] + >([]); - const graphValues = useMemo(() => { - const rows = syncs.map((sync) => ({ - timestamp: sync.endTime, - count: sync.numRows, - })); - let timedRowCounts = aggregateCountsByInterval(rows, aggregateType); - timedRowCounts = timedRowCounts.slice(0, 29); - timedRowCounts = timedRowCounts.reverse(); - return timedRowCounts.map((count) => ({ - name: formatGraphLabel(new Date(count[0]), aggregateType), - 'Rows synced at a point in time': Number(count[1]), - })); - }, [syncs, aggregateType]); + useEffect(() => { + const fetchData = async () => { + const req: any = { + flowJobName: mirrorName, + aggregateType, + }; + + const res = await fetch('/api/v1/mirrors/cdc/graph', { + method: 'POST', + cache: 'no-store', + body: JSON.stringify(req), + }); + const data: { data: { time: number; rows: number }[] } = await res.json(); + setGraphValues( + data.data.map(({ time, rows }) => ({ + name: formatGraphLabel(new Date(time), aggregateType), + 'Rows synced at a point in time': Number(rows), + })) + ); + }; + + fetchData(); + }, [mirrorName, aggregateType]); return (
@@ -59,5 +68,3 @@ function CdcGraph({ syncs }: CdcGraphProps) {
); } - -export default CdcGraph; diff --git a/ui/app/mirrors/[mirrorId]/handlers.ts b/ui/app/mirrors/[mirrorId]/handlers.ts index 7e68f26a53..0a3f46a4e2 100644 --- a/ui/app/mirrors/[mirrorId]/handlers.ts +++ b/ui/app/mirrors/[mirrorId]/handlers.ts @@ -12,6 +12,7 @@ export const getMirrorState = async ( body: JSON.stringify({ flow_job_name, include_flow_info: true, + exclude_batches: true, }), }); if (!res.ok) throw res.json(); diff --git a/ui/app/mirrors/[mirrorId]/page.tsx b/ui/app/mirrors/[mirrorId]/page.tsx index 5516e0986a..33832a66cd 100644 --- a/ui/app/mirrors/[mirrorId]/page.tsx +++ b/ui/app/mirrors/[mirrorId]/page.tsx @@ -51,12 +51,7 @@ export default function ViewMirror({ params: { mirrorId } }: EditMirrorProps) { let actionsDropdown = null; if (mirrorState?.cdcStatus) { - syncStatusChild = ( - - ); + syncStatusChild = ; const dbType = dBTypeFromJSON(mirrorState.cdcStatus.destinationType); @@ -93,11 +88,7 @@ export default function ViewMirror({ params: { mirrorId } }: EditMirrorProps) {
{mirrorId}
{actionsDropdown} - + ); } else if (mirrorState?.qrepStatus) { diff --git a/ui/app/mirrors/[mirrorId]/qrepGraph.tsx b/ui/app/mirrors/[mirrorId]/qrepGraph.tsx index 84eb958935..7bc1dbff5d 100644 --- a/ui/app/mirrors/[mirrorId]/qrepGraph.tsx +++ b/ui/app/mirrors/[mirrorId]/qrepGraph.tsx @@ -17,22 +17,20 @@ type QRepGraphProps = { }; function QrepGraph({ syncs }: QRepGraphProps) { - let [aggregateType, setAggregateType] = useState( + const [aggregateType, setAggregateType] = useState( TimeAggregateTypes.HOUR ); const initialCount: [string, number][] = []; - let [counts, setCounts] = useState(initialCount); + const [counts, setCounts] = useState(initialCount); useEffect(() => { - let rows = syncs.map((sync) => ({ + const rows = syncs.map((sync) => ({ timestamp: sync.startTime!, count: Number(sync.rowsInPartition) ?? 0, })); - let counts = aggregateCountsByInterval(rows, aggregateType); - counts = counts.slice(0, 29); - counts = counts.reverse(); - setCounts(counts); + const counts = aggregateCountsByInterval(rows, aggregateType); + setCounts(counts.slice(0, 29).reverse()); }, [aggregateType, syncs]); return ( diff --git a/ui/app/mirrors/[mirrorId]/syncStatus.tsx b/ui/app/mirrors/[mirrorId]/syncStatus.tsx index 0c2d2ba49d..fb45bfc6e7 100644 --- a/ui/app/mirrors/[mirrorId]/syncStatus.tsx +++ b/ui/app/mirrors/[mirrorId]/syncStatus.tsx @@ -1,6 +1,6 @@ 'use client'; import { fetcher } from '@/app/utils/swr'; -import { CDCBatch, CDCTableTotalCountsResponse } from '@/grpc_generated/route'; +import { CDCTableTotalCountsResponse } from '@/grpc_generated/route'; import useSWR from 'swr'; import CdcGraph from './cdcGraph'; import RowsDisplay from './rowsDisplay'; @@ -9,10 +9,9 @@ import TableStats from './tableStats'; type SyncStatusProps = { flowJobName: string; - rows: CDCBatch[]; }; -export default function SyncStatus({ flowJobName, rows }: SyncStatusProps) { +export default function SyncStatus({ flowJobName }: SyncStatusProps) { const { data: tableStats, error, @@ -31,9 +30,9 @@ export default function SyncStatus({ flowJobName, rows }: SyncStatusProps) {
- +
- +
) diff --git a/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx b/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx index 6453408948..493822fcf3 100644 --- a/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx +++ b/ui/app/mirrors/[mirrorId]/syncStatusTable.tsx @@ -1,22 +1,21 @@ 'use client'; -import SelectTheme from '@/app/styles/select'; import TimeLabel from '@/components/TimeComponent'; -import { CDCBatch } from '@/grpc_generated/route'; +import { + CDCBatch, + GetCDCBatchesRequest, + GetCDCBatchesResponse, +} from '@/grpc_generated/route'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { ProgressCircle } from '@/lib/ProgressCircle'; -import { SearchField } from '@/lib/SearchField'; import { Table, TableCell, TableRow } from '@/lib/Table'; import moment from 'moment'; -import { useMemo, useState } from 'react'; -import ReactSelect from 'react-select'; +import { useCallback, useEffect, useState } from 'react'; import { RowDataFormatter } from './rowsDisplay'; -type SyncStatusTableProps = { - rows: CDCBatch[]; -}; +type SyncStatusTableProps = { mirrorName: string }; function TimeWithDurationOrRunning({ startTime, @@ -46,63 +45,54 @@ function TimeWithDurationOrRunning({ } const ROWS_PER_PAGE = 5; -const sortOptions = [ - { value: 'batchId', label: 'Batch ID' }, - { value: 'startTime', label: 'Start Time' }, - { value: 'endTime', label: 'End Time' }, - { value: 'numRows', label: 'Rows Synced' }, -]; - -export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { +export const SyncStatusTable = ({ mirrorName }: SyncStatusTableProps) => { + const [totalPages, setTotalPages] = useState(1); const [currentPage, setCurrentPage] = useState(1); - const [sortField, setSortField] = useState< - 'startTime' | 'endTime' | 'numRows' | 'batchId' - >('batchId'); - - const [sortDir, setSortDir] = useState<'asc' | 'dsc'>('dsc'); - const totalPages = Math.ceil(rows.length / ROWS_PER_PAGE); - const [searchQuery, setSearchQuery] = useState(NaN); - const displayedRows = useMemo(() => { - const searchRows = rows.filter((row) => row.batchId == searchQuery); - const shownRows = searchRows.length > 0 ? searchRows : rows; - shownRows.sort((a, b) => { - let aValue: any = a[sortField]; - let bValue: any = b[sortField]; - if (aValue === undefined || bValue === undefined) { - return 0; - } - if (sortField === 'batchId') { - aValue = BigInt(aValue); - bValue = BigInt(bValue); - } + const [ascending, setAscending] = useState(false); + const [[beforeId, afterId], setBeforeAfterId] = useState([-1, -1]); + const [batches, setBatches] = useState([]); - if (aValue < bValue) { - return sortDir === 'dsc' ? 1 : -1; - } else if (aValue > bValue) { - return sortDir === 'dsc' ? -1 : 1; - } else { - return 0; - } - }); + useEffect(() => { + const fetchData = async () => { + const req: GetCDCBatchesRequest = { + flowJobName: mirrorName, + limit: ROWS_PER_PAGE, + beforeId: beforeId, + afterId: afterId, + ascending, + }; + const res = await fetch('/api/v1/mirrors/cdc/batches', { + method: 'POST', + cache: 'no-store', + body: JSON.stringify(req), + }); + const data: GetCDCBatchesResponse = await res.json(); + setBatches(data.cdcBatches ?? []); + setCurrentPage(data.page); + setTotalPages(Math.ceil(data.total / req.limit)); + }; - const startRow = (currentPage - 1) * ROWS_PER_PAGE; - const endRow = startRow + ROWS_PER_PAGE; - return shownRows.length > ROWS_PER_PAGE - ? shownRows.slice(startRow, endRow) - : shownRows; - }, [searchQuery, currentPage, rows, sortField, sortDir]); + fetchData(); + }, [mirrorName, beforeId, afterId, ascending]); - const handlePrevPage = () => { - if (currentPage > 1) { - setCurrentPage(currentPage - 1); + const nextPage = useCallback(() => { + if (batches.length === 0) { + setBeforeAfterId([-1, ascending ? 0 : -1]); + } else if (ascending) { + setBeforeAfterId([-1, batches[batches.length - 1].batchId]); + } else { + setBeforeAfterId([batches[batches.length - 1].batchId, -1]); } - }; - - const handleNextPage = () => { - if (currentPage < totalPages) { - setCurrentPage(currentPage + 1); + }, [batches, ascending]); + const prevPage = useCallback(() => { + if (batches.length === 0 || currentPage < 3) { + setBeforeAfterId([-1, ascending ? 0 : -1]); + } else if (ascending) { + setBeforeAfterId([batches[0].batchId, -1]); + } else { + setBeforeAfterId([-1, batches[0].batchId]); } - }; + }, [batches, ascending, currentPage]); return ( { toolbar={{ left: (
- - @@ -123,53 +113,30 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { > -
- opt.value === sortField) - ?.label, - }} - onChange={(val, _) => { - const sortVal = - (val?.value as - | 'startTime' - | 'endTime' - | 'numRows' - | 'batchId') ?? 'batchId'; - setSortField(sortVal); - }} - defaultValue={{ value: 'batchId', label: 'Batch ID' }} - theme={SelectTheme} - /> -
), - right: ( - ) => - setSearchQuery(+e.target.value) - } - /> - ), }} header={ @@ -185,7 +152,7 @@ export const SyncStatusTable = ({ rows }: SyncStatusTableProps) => { } > - {displayedRows.map((row) => ( + {batches.map((row) => ( diff --git a/ui/app/mirrors/errors/[mirrorName]/page.tsx b/ui/app/mirrors/errors/[mirrorName]/page.tsx index 42c36c336d..af8acfb66d 100644 --- a/ui/app/mirrors/errors/[mirrorName]/page.tsx +++ b/ui/app/mirrors/errors/[mirrorName]/page.tsx @@ -1,56 +1,13 @@ 'use client'; import LogsTable from '@/components/LogsTable'; -import { - ListMirrorLogsRequest, - ListMirrorLogsResponse, - MirrorLog, -} from '@/grpc_generated/route'; import { Label } from '@/lib/Label'; import { useParams } from 'next/navigation'; -import { useEffect, useState } from 'react'; import { ToastContainer } from 'react-toastify'; import 'react-toastify/dist/ReactToastify.css'; export default function MirrorError() { const params = useParams<{ mirrorName: string }>(); - const [mirrorErrors, setMirrorErrors] = useState([]); - const [currentPage, setCurrentPage] = useState(1); - const [totalPages, setTotalPages] = useState(1); - - useEffect(() => { - setCurrentPage(1); - }, [params.mirrorName]); - - useEffect(() => { - const req: ListMirrorLogsRequest = { - flowJobName: params.mirrorName, - page: currentPage, - numPerPage: 10, - level: 'all', - }; - - const fetchData = async () => { - try { - const response = await fetch('/api/v1/mirrors/logs', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - cache: 'no-store', - body: JSON.stringify(req), - }); - const data: ListMirrorLogsResponse = await response.json(); - const numPages = Math.ceil(data.total / req.numPerPage); - setMirrorErrors(data.errors); - setTotalPages(numPages); - } catch (error) { - console.error('Error fetching mirror errors:', error); - } - }; - - fetchData(); - }, [currentPage, params.mirrorName]); return ( <> @@ -72,10 +29,9 @@ export default function MirrorError() { diff --git a/ui/app/peers/[peerName]/lagGraph.tsx b/ui/app/peers/[peerName]/lagGraph.tsx index 7107b7d809..87b90fa8c8 100644 --- a/ui/app/peers/[peerName]/lagGraph.tsx +++ b/ui/app/peers/[peerName]/lagGraph.tsx @@ -22,7 +22,7 @@ function parseLSN(lsn: string): number { if (!lsn) return 0; const [lsn1, lsn2] = lsn.split('/'); return Number( - (BigInt(parseInt(lsn1)) << BigInt(32)) | BigInt(parseInt(lsn2)) + (BigInt(parseInt(lsn1, 16)) << BigInt(32)) | BigInt(parseInt(lsn2, 16)) ); } @@ -135,7 +135,7 @@ export default function LagGraph({ peerName }: LagGraphProps) { /> setShowLsn((val) => !val)} /> )} diff --git a/ui/app/settings/page.tsx b/ui/app/settings/page.tsx index e5bb5ad15b..7ebb1b4cd0 100644 --- a/ui/app/settings/page.tsx +++ b/ui/app/settings/page.tsx @@ -123,9 +123,6 @@ const DynamicSettingItem = ({ const updatedSetting = { ...setting, value: newValue }; await fetch('/api/v1/dynamic_settings', { method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, body: JSON.stringify(updatedSetting), }); setEditMode(false); diff --git a/ui/components/LogsTable.tsx b/ui/components/LogsTable.tsx index 7d3486158a..cc09deeb7e 100644 --- a/ui/components/LogsTable.tsx +++ b/ui/components/LogsTable.tsx @@ -1,9 +1,14 @@ import TimeLabel from '@/components/TimeComponent'; -import { MirrorLog } from '@/grpc_generated/route'; +import { + ListMirrorLogsRequest, + ListMirrorLogsResponse, + MirrorLog, +} from '@/grpc_generated/route'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; import { Label } from '@/lib/Label'; import { Table, TableCell, TableRow } from '@/lib/Table'; +import { useCallback, useEffect, useState } from 'react'; import 'react-toastify/dist/ReactToastify.css'; const colorForErrorType = (errorType: string) => { @@ -25,26 +30,60 @@ const extractFromCloneName = (mirrorOrCloneName: string) => { }; export default function LogsTable({ - logs, - currentPage, - totalPages, - setCurrentPage, + numPerPage, + mirrorName, + logLevel, }: { - logs: MirrorLog[]; - currentPage: number; - totalPages: number; - setCurrentPage: (page: number) => void; + numPerPage: number; + mirrorName: string; + logLevel: string; }) { - const handleNextPage = () => { - if (currentPage < totalPages) { - setCurrentPage(currentPage + 1); + const [logs, setLogs] = useState([]); + const [currentPage, setCurrentPage] = useState(1); + const [totalPages, setTotalPages] = useState(1); + const [[beforeId, afterId], setBeforeAfterId] = useState([-1, -1]); + const nextPage = useCallback(() => { + if (logs.length === 0) { + setBeforeAfterId([-1, -1]); } - }; - const handlePrevPage = () => { - if (currentPage > 1) { - setCurrentPage(currentPage - 1); + setBeforeAfterId([logs[logs.length - 1].id, -1]); + }, [logs]); + const prevPage = useCallback(() => { + if (logs.length === 0 || currentPage < 3) { + setBeforeAfterId([-1, -1]); } - }; + setBeforeAfterId([-1, logs[0].id]); + }, [logs, currentPage]); + + useEffect(() => { + const fetchData = async () => { + const req: ListMirrorLogsRequest = { + level: logLevel, + flowJobName: mirrorName, + beforeId, + afterId, + numPerPage, + page: 0, // deprecated + }; + + try { + const response = await fetch('/api/v1/mirrors/logs', { + method: 'POST', + cache: 'no-store', + body: JSON.stringify(req), + }); + const data: ListMirrorLogsResponse = await response.json(); + const numPages = Math.ceil(data.total / req.numPerPage); + setLogs(data.errors); + setTotalPages(numPages); + setCurrentPage(data.page); + } catch (error) { + console.error('Error fetching mirror logs:', error); + } + }; + + fetchData(); + }, [mirrorName, logLevel, numPerPage, afterId, beforeId]); return (
- - @@ -82,7 +121,7 @@ export default function LogsTable({ }} > {logs.map((log, idx) => ( - +