From 1b0353c232b4b2ad8e671d9d26cc7e20c07fa3c9 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Mon, 9 Oct 2023 20:24:50 +0530 Subject: [PATCH] UI - Create Mirror (#490) - Implements Create Mirror user flow - Tested happy path by running a successful PG-SF mirror - Snapshot fields appear only when initial copy is toggled - Staging path fields appear only when sync mode is avro - Frontend validation tested Fixes #432 and #433 --- flow/cmd/handler.go | 59 +++++ flow/generated/protos/route.pb.go | 235 +++++++++--------- nexus/analyzer/src/lib.rs | 4 +- nexus/flow-rs/src/grpc.rs | 1 + nexus/pt/src/peerdb_route.rs | 2 + nexus/pt/src/peerdb_route.serde.rs | 18 ++ nexus/server/src/main.rs | 3 +- nexus/sqlparser-rs | 2 +- protos/route.proto | 1 + ui/app/api/mirrors/cdc/route.ts | 21 ++ ui/app/api/peers/route.ts | 8 + ui/app/mirrors/create/config.tsx | 162 ++++++++++++ ui/app/mirrors/create/handlers.ts | 78 ++++++ ui/app/mirrors/create/helpers/cdc.ts | 129 ++++++++++ ui/app/mirrors/create/helpers/common.ts | 41 +++ ui/app/mirrors/create/page.tsx | 149 ++++++----- ui/app/mirrors/create/schema.ts | 74 ++++++ ui/app/mirrors/create/tablemapping.tsx | 93 +++++++ ui/app/mirrors/types.ts | 6 + .../create/configuration/helpers/common.ts | 2 +- .../peers/create/configuration/helpers/pg.ts | 4 +- .../peers/create/configuration/helpers/sf.ts | 4 +- ui/app/peers/create/configuration/page.tsx | 5 +- ui/app/peers/create/configuration/schema.ts | 2 +- ui/app/peers/handler.ts | 8 + ui/app/peers/page.tsx | 11 +- ui/components/ConfigForm.tsx | 8 +- ui/grpc_generated/route.ts | 18 +- 28 files changed, 933 insertions(+), 215 deletions(-) create mode 100644 ui/app/api/mirrors/cdc/route.ts create mode 100644 ui/app/mirrors/create/config.tsx create mode 100644 ui/app/mirrors/create/handlers.ts create mode 100644 ui/app/mirrors/create/helpers/cdc.ts create mode 100644 ui/app/mirrors/create/helpers/common.ts create mode 100644 ui/app/mirrors/create/schema.ts create mode 100644 ui/app/mirrors/create/tablemapping.tsx create mode 100644 ui/app/mirrors/types.ts create mode 100644 ui/app/peers/handler.ts diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index 6a93ba3fde..cd25a87ca7 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "strings" "time" "github.com/PeerDB-io/peer-flow/connectors" @@ -31,6 +32,57 @@ func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool) *Fl } } +func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (int32, int32, error) { + var id int32 + var peerType int32 + err := h.pool.QueryRow(ctx, "SELECT id,type FROM peers WHERE name = $1", peerName).Scan(&id, &peerType) + if err != nil { + log.Errorf("unable to query peer id for peer %s: %s", peerName, err.Error()) + return -1, -1, fmt.Errorf("unable to query peer id for peer %s: %s", peerName, err) + } + return id, peerType, nil +} + +func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string { + tableIdentifierParts := strings.Split(tableIdentifier, ".") + if len(tableIdentifierParts) == 1 && peerDBType != int32(protos.DBType_BIGQUERY) { + tableIdentifierParts = append([]string{"public"}, tableIdentifierParts...) + } + return strings.Join(tableIdentifierParts, ".") +} + +func (h *FlowRequestHandler) createFlowJobEntry(ctx context.Context, + req *protos.CreateCDCFlowRequest, workflowID string) error { + sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.Source.Name) + if srcErr != nil { + return fmt.Errorf("unable to get peer id for source peer %s: %w", + req.ConnectionConfigs.Source.Name, srcErr) + } + + destinationPeerID, destinationPeerType, dstErr := h.getPeerID(ctx, req.ConnectionConfigs.Destination.Name) + if dstErr != nil { + return fmt.Errorf("unable to get peer id for target peer %s: %w", + req.ConnectionConfigs.Destination.Name, srcErr) + } + + for sourceTableIdentifier := range req.ConnectionConfigs.TableNameMapping { + destinationTableIdentifier := req.ConnectionConfigs.TableNameMapping[sourceTableIdentifier] + _, err := h.pool.Exec(ctx, ` + INSERT INTO flows (workflow_id, name, source_peer, destination_peer, description, + source_table_identifier, destination_table_identifier) VALUES ($1, $2, $3, $4, $5, $6, $7) + `, workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID, + "Mirror created via GRPC", + schemaForTableIdentifier(sourceTableIdentifier, sourePeerType), + schemaForTableIdentifier(destinationTableIdentifier, destinationPeerType)) + if err != nil { + return fmt.Errorf("unable to insert into flows table for flow %s with source table %s: %w", + req.ConnectionConfigs.FlowJobName, sourceTableIdentifier, err) + } + } + + return nil +} + // Close closes the connection pool func (h *FlowRequestHandler) Close() { if h.pool != nil { @@ -59,6 +111,13 @@ func (h *FlowRequestHandler) CreateCDCFlow( MaxBatchSize: maxBatchSize, } + if req.CreateCatalogEntry { + err := h.createFlowJobEntry(ctx, req, workflowID) + if err != nil { + return nil, fmt.Errorf("unable to create flow job entry: %w", err) + } + } + state := peerflow.NewCDCFlowState() _, err := h.temporalClient.ExecuteWorkflow( ctx, // context diff --git a/flow/generated/protos/route.pb.go b/flow/generated/protos/route.pb.go index 9d580676fb..0918c82794 100644 --- a/flow/generated/protos/route.pb.go +++ b/flow/generated/protos/route.pb.go @@ -124,7 +124,8 @@ type CreateCDCFlowRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ConnectionConfigs *FlowConnectionConfigs `protobuf:"bytes,1,opt,name=connection_configs,json=connectionConfigs,proto3" json:"connection_configs,omitempty"` + ConnectionConfigs *FlowConnectionConfigs `protobuf:"bytes,1,opt,name=connection_configs,json=connectionConfigs,proto3" json:"connection_configs,omitempty"` + CreateCatalogEntry bool `protobuf:"varint,2,opt,name=create_catalog_entry,json=createCatalogEntry,proto3" json:"create_catalog_entry,omitempty"` } func (x *CreateCDCFlowRequest) Reset() { @@ -166,6 +167,13 @@ func (x *CreateCDCFlowRequest) GetConnectionConfigs() *FlowConnectionConfigs { return nil } +func (x *CreateCDCFlowRequest) GetCreateCatalogEntry() bool { + if x != nil { + return x.CreateCatalogEntry + } + return false +} + type CreateCDCFlowResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -730,120 +738,123 @@ var file_route_proto_rawDesc = []byte{ 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0b, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0a, 0x66, 0x6c, 0x6f, 0x77, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x69, 0x0a, 0x14, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, - 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x51, 0x0a, - 0x12, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, - 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, 0x6e, - 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x52, 0x11, 0x63, - 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, - 0x22, 0x36, 0x0a, 0x15, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, - 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x77, 0x6f, 0x72, - 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x77, - 0x6f, 0x72, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x22, 0x51, 0x0a, 0x15, 0x43, 0x72, 0x65, 0x61, - 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x38, 0x0a, 0x0b, 0x71, 0x72, 0x65, 0x70, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, - 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, - 0x0a, 0x71, 0x72, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x37, 0x0a, 0x16, 0x43, - 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x77, 0x6f, 0x72, 0x66, 0x6c, 0x6f, 0x77, - 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x77, 0x6f, 0x72, 0x66, 0x6c, - 0x6f, 0x77, 0x49, 0x64, 0x22, 0xca, 0x01, 0x0a, 0x0f, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, - 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x77, 0x6f, 0x72, 0x6b, - 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, - 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, 0x66, 0x6c, 0x6f, - 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x0b, 0x66, 0x6c, 0x6f, 0x77, 0x4a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x33, 0x0a, - 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, - 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x65, - 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x10, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, - 0x52, 0x0f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x65, 0x65, - 0x72, 0x22, 0x47, 0x0a, 0x10, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x12, 0x0a, 0x10, 0x4c, 0x69, - 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x3d, - 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x28, 0x0a, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, - 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x22, 0x3d, 0x0a, - 0x13, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x9b, 0x01, 0x0a, 0x14, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, + 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x51, + 0x0a, 0x12, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x70, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x43, 0x6f, 0x6e, + 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x52, 0x11, + 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x73, 0x12, 0x30, 0x0a, 0x14, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x5f, 0x63, 0x61, 0x74, 0x61, + 0x6c, 0x6f, 0x67, 0x5f, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, + 0x12, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x61, 0x74, 0x61, 0x6c, 0x6f, 0x67, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x22, 0x36, 0x0a, 0x15, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, + 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, + 0x77, 0x6f, 0x72, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x77, 0x6f, 0x72, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x22, 0x51, 0x0a, 0x15, 0x43, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x38, 0x0a, 0x0b, 0x71, 0x72, 0x65, 0x70, 0x5f, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x51, 0x52, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x52, 0x0a, 0x71, 0x72, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x37, + 0x0a, 0x16, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x77, 0x6f, 0x72, 0x66, + 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x77, 0x6f, + 0x72, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x22, 0xca, 0x01, 0x0a, 0x0f, 0x53, 0x68, 0x75, 0x74, + 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x77, + 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x22, 0x0a, 0x0d, + 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x6a, 0x6f, 0x62, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0b, 0x66, 0x6c, 0x6f, 0x77, 0x4a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, 0x65, + 0x12, 0x33, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, + 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x50, 0x65, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x10, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, + 0x65, 0x65, 0x72, 0x52, 0x0f, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x50, 0x65, 0x65, 0x72, 0x22, 0x47, 0x0a, 0x10, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x02, 0x6f, 0x6b, 0x12, 0x23, 0x0a, 0x0d, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0c, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x12, 0x0a, + 0x10, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x22, 0x3d, 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x28, 0x0a, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, + 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x05, 0x70, 0x65, 0x65, 0x72, 0x73, + 0x22, 0x3d, 0x0a, 0x13, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x26, 0x0a, 0x04, 0x70, 0x65, 0x65, 0x72, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, + 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x04, 0x70, 0x65, 0x65, 0x72, 0x22, + 0x3b, 0x0a, 0x11, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x26, 0x0a, 0x04, 0x70, 0x65, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, - 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x04, 0x70, 0x65, 0x65, 0x72, 0x22, 0x3b, 0x0a, 0x11, - 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x26, 0x0a, 0x04, 0x70, 0x65, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x12, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x70, 0x65, 0x65, 0x72, 0x73, 0x2e, 0x50, - 0x65, 0x65, 0x72, 0x52, 0x04, 0x70, 0x65, 0x65, 0x72, 0x22, 0x6a, 0x0a, 0x14, 0x56, 0x61, 0x6c, - 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x38, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0e, 0x32, 0x20, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, - 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x66, 0x0a, 0x12, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, - 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x06, 0x73, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1e, 0x2e, 0x70, 0x65, - 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, - 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2a, 0x42, 0x0a, - 0x12, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x14, 0x0a, 0x10, 0x43, 0x52, 0x45, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, - 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x56, 0x41, 0x4c, - 0x49, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, - 0x02, 0x2a, 0x43, 0x0a, 0x10, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x0a, 0x12, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x41, 0x54, - 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, - 0x07, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x46, 0x41, - 0x49, 0x4c, 0x45, 0x44, 0x10, 0x02, 0x32, 0x95, 0x04, 0x0a, 0x0b, 0x46, 0x6c, 0x6f, 0x77, 0x53, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4e, 0x0a, 0x09, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, - 0x65, 0x72, 0x73, 0x12, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, - 0x74, 0x65, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, - 0x74, 0x65, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0c, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, - 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, - 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, - 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, - 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, - 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x51, 0x0a, 0x0a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x12, 0x1f, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, - 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, + 0x73, 0x2e, 0x50, 0x65, 0x65, 0x72, 0x52, 0x04, 0x70, 0x65, 0x65, 0x72, 0x22, 0x6a, 0x0a, 0x14, + 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0e, 0x32, 0x20, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, + 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, + 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x66, 0x0a, 0x12, 0x43, 0x72, 0x65, 0x61, + 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, + 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, - 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x12, 0x5a, 0x0a, 0x0d, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, - 0x6c, 0x6f, 0x77, 0x12, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, - 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, - 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, - 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5d, - 0x0a, 0x0e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, - 0x12, 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, - 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, - 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, - 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4f, 0x0a, - 0x0c, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x1d, 0x2e, - 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, - 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, - 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, 0x68, 0x75, 0x74, - 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x7c, - 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, - 0x74, 0x65, 0x42, 0x0a, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, - 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, - 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xca, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, - 0x6f, 0x75, 0x74, 0x65, 0xe2, 0x02, 0x17, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, - 0x74, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, 0x02, - 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, + 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x2a, 0x42, 0x0a, 0x12, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x14, 0x0a, 0x10, 0x43, 0x52, 0x45, 0x41, 0x54, 0x49, + 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, + 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x56, 0x41, 0x4c, + 0x49, 0x44, 0x10, 0x02, 0x2a, 0x43, 0x0a, 0x10, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, + 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x16, 0x0a, 0x12, 0x56, 0x41, 0x4c, 0x49, + 0x44, 0x41, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, + 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x52, 0x45, 0x41, 0x54, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, + 0x06, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x02, 0x32, 0x95, 0x04, 0x0a, 0x0b, 0x46, 0x6c, + 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4e, 0x0a, 0x09, 0x4c, 0x69, 0x73, + 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x12, 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x65, 0x65, 0x72, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x57, 0x0a, 0x0c, 0x56, 0x61, 0x6c, + 0x69, 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x12, 0x21, 0x2e, 0x70, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x69, 0x64, 0x61, 0x74, + 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x70, + 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x56, 0x61, 0x6c, 0x69, + 0x64, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x12, 0x51, 0x0a, 0x0a, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, + 0x12, 0x1f, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, + 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x20, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, + 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x65, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5a, 0x0a, 0x0d, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, + 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x12, 0x22, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x43, 0x44, 0x43, 0x46, + 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x70, 0x65, 0x65, + 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, + 0x43, 0x44, 0x43, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x12, 0x5d, 0x0a, 0x0e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, + 0x6c, 0x6f, 0x77, 0x12, 0x23, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, + 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, 0x65, 0x70, 0x46, 0x6c, 0x6f, + 0x77, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, + 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x51, 0x52, + 0x65, 0x70, 0x46, 0x6c, 0x6f, 0x77, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x12, 0x4f, 0x0a, 0x0c, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x46, 0x6c, 0x6f, 0x77, + 0x12, 0x1d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, + 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x1e, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x2e, 0x53, + 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x42, 0x7c, 0x0a, 0x10, 0x63, 0x6f, 0x6d, 0x2e, 0x70, 0x65, 0x65, 0x72, 0x64, 0x62, 0x5f, + 0x72, 0x6f, 0x75, 0x74, 0x65, 0x42, 0x0a, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x50, 0x01, 0x5a, 0x10, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x61, 0x74, 0x65, 0x64, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x73, 0xa2, 0x02, 0x03, 0x50, 0x58, 0x58, 0xaa, 0x02, 0x0b, 0x50, 0x65, + 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xca, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, + 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0xe2, 0x02, 0x17, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, + 0x52, 0x6f, 0x75, 0x74, 0x65, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, + 0x61, 0xea, 0x02, 0x0b, 0x50, 0x65, 0x65, 0x72, 0x64, 0x62, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index b9d01e1739..a73d02a687 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -152,14 +152,14 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { match create_mirror { CDC(cdc) => { let mut flow_job_table_mappings = vec![]; - for table_mapping in &cdc.table_mappings { + for table_mapping in &cdc.mappings { flow_job_table_mappings.push(FlowJobTableMapping { source_table_identifier: table_mapping .source_table_identifier .to_string() .to_lowercase(), target_table_identifier: table_mapping - .target_table_identifier + .target_identifier .to_string() .to_lowercase(), }); diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 95f3c206cb..b628570afb 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -111,6 +111,7 @@ impl FlowGrpcClient { ) -> anyhow::Result { let create_peer_flow_req = pt::peerdb_route::CreateCdcFlowRequest { connection_configs: Some(peer_flow_config), + create_catalog_entry: false }; let response = self.client.create_cdc_flow(create_peer_flow_req).await?; let workflow_id = response.into_inner().worflow_id; diff --git a/nexus/pt/src/peerdb_route.rs b/nexus/pt/src/peerdb_route.rs index f564f070e4..e07ebdc70c 100644 --- a/nexus/pt/src/peerdb_route.rs +++ b/nexus/pt/src/peerdb_route.rs @@ -4,6 +4,8 @@ pub struct CreateCdcFlowRequest { #[prost(message, optional, tag="1")] pub connection_configs: ::core::option::Option, + #[prost(bool, tag="2")] + pub create_catalog_entry: bool, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/nexus/pt/src/peerdb_route.serde.rs b/nexus/pt/src/peerdb_route.serde.rs index 685aefd6f5..c3d59033f6 100644 --- a/nexus/pt/src/peerdb_route.serde.rs +++ b/nexus/pt/src/peerdb_route.serde.rs @@ -10,10 +10,16 @@ impl serde::Serialize for CreateCdcFlowRequest { if self.connection_configs.is_some() { len += 1; } + if self.create_catalog_entry { + len += 1; + } let mut struct_ser = serializer.serialize_struct("peerdb_route.CreateCDCFlowRequest", len)?; if let Some(v) = self.connection_configs.as_ref() { struct_ser.serialize_field("connectionConfigs", v)?; } + if self.create_catalog_entry { + struct_ser.serialize_field("createCatalogEntry", &self.create_catalog_entry)?; + } struct_ser.end() } } @@ -26,11 +32,14 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest { const FIELDS: &[&str] = &[ "connection_configs", "connectionConfigs", + "create_catalog_entry", + "createCatalogEntry", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { ConnectionConfigs, + CreateCatalogEntry, __SkipField__, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -54,6 +63,7 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest { { match value { "connectionConfigs" | "connection_configs" => Ok(GeneratedField::ConnectionConfigs), + "createCatalogEntry" | "create_catalog_entry" => Ok(GeneratedField::CreateCatalogEntry), _ => Ok(GeneratedField::__SkipField__), } } @@ -74,6 +84,7 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest { V: serde::de::MapAccess<'de>, { let mut connection_configs__ = None; + let mut create_catalog_entry__ = None; while let Some(k) = map.next_key()? { match k { GeneratedField::ConnectionConfigs => { @@ -82,6 +93,12 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest { } connection_configs__ = map.next_value()?; } + GeneratedField::CreateCatalogEntry => { + if create_catalog_entry__.is_some() { + return Err(serde::de::Error::duplicate_field("createCatalogEntry")); + } + create_catalog_entry__ = Some(map.next_value()?); + } GeneratedField::__SkipField__ => { let _ = map.next_value::()?; } @@ -89,6 +106,7 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest { } Ok(CreateCdcFlowRequest { connection_configs: connection_configs__, + create_catalog_entry: create_catalog_entry__.unwrap_or_default(), }) } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 20ff95daf3..e663b132ce 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -455,8 +455,7 @@ impl NexusBackend { "got workflow id: {:?}", workflow_details.as_ref().map(|w| &w.workflow_id) ); - if workflow_details.is_some() { - let workflow_details = workflow_details.unwrap(); + if let Some(workflow_details) = workflow_details { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler .shutdown_flow_job(flow_job_name, workflow_details) diff --git a/nexus/sqlparser-rs b/nexus/sqlparser-rs index 49b806a49c..271dafe34e 160000 --- a/nexus/sqlparser-rs +++ b/nexus/sqlparser-rs @@ -1 +1 @@ -Subproject commit 49b806a49c325cff7203f9b71d0c68cf9e237f30 +Subproject commit 271dafe34eb0b42586c4341382540dfd33623e9d diff --git a/protos/route.proto b/protos/route.proto index 96dd65aaaa..37d3b530df 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -8,6 +8,7 @@ package peerdb_route; message CreateCDCFlowRequest { peerdb_flow.FlowConnectionConfigs connection_configs = 1; + bool create_catalog_entry = 2; } message CreateCDCFlowResponse { diff --git a/ui/app/api/mirrors/cdc/route.ts b/ui/app/api/mirrors/cdc/route.ts new file mode 100644 index 0000000000..ccb37ae797 --- /dev/null +++ b/ui/app/api/mirrors/cdc/route.ts @@ -0,0 +1,21 @@ +import { + CreateCDCFlowRequest, + CreateCDCFlowResponse, +} from '@/grpc_generated/route'; +import { GetFlowServiceClientFromEnv } from '@/rpc/rpc'; + +export async function POST(request: Request) { + const body = await request.json(); + const { config } = body; + const flowServiceClient = GetFlowServiceClientFromEnv(); + const req: CreateCDCFlowRequest = { + connectionConfigs: config, + createCatalogEntry: true, + }; + const createStatus: CreateCDCFlowResponse = + await flowServiceClient.createCdcFlow(req); + if (!createStatus.worflowId) { + return new Response('Failed to create CDC mirror'); + } + return new Response('created'); +} diff --git a/ui/app/api/peers/route.ts b/ui/app/api/peers/route.ts index 808187624d..b43ad771ba 100644 --- a/ui/app/api/peers/route.ts +++ b/ui/app/api/peers/route.ts @@ -9,6 +9,7 @@ import { CreatePeerRequest, CreatePeerResponse, CreatePeerStatus, + ListPeersRequest, ValidatePeerRequest, ValidatePeerResponse, ValidatePeerStatus, @@ -63,3 +64,10 @@ export async function POST(request: Request) { } else return new Response('status of peer creation is unknown'); } else return new Response('mode of peer creation is unknown'); } + +export async function GET(request: Request) { + let flowServiceClient = GetFlowServiceClientFromEnv(); + let req: ListPeersRequest = {}; + let peers = await flowServiceClient.listPeers(req); + return new Response(JSON.stringify(peers)); +} diff --git a/ui/app/mirrors/create/config.tsx b/ui/app/mirrors/create/config.tsx new file mode 100644 index 0000000000..b3aae6c250 --- /dev/null +++ b/ui/app/mirrors/create/config.tsx @@ -0,0 +1,162 @@ +'use client'; +import { QRepSyncMode } from '@/grpc_generated/flow'; +import { Peer } from '@/grpc_generated/peers'; +import { Label } from '@/lib/Label'; +import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout'; +import { Select, SelectItem } from '@/lib/Select'; +import { Switch } from '@/lib/Switch'; +import { TextField } from '@/lib/TextField'; +import { Tooltip } from '@/lib/Tooltip'; +import { InfoPopover } from '../../../components/InfoPopover'; +import { MirrorConfig, MirrorSetter } from '../types'; +import { MirrorSetting } from './helpers/common'; + +interface MirrorConfigProps { + settings: MirrorSetting[]; + mirrorConfig: MirrorConfig; + peers: Peer[]; + setter: MirrorSetter; +} + +export default function MirrorConfig(props: MirrorConfigProps) { + const handleChange = (val: string | boolean, setting: MirrorSetting) => { + let stateVal: string | boolean | Peer | QRepSyncMode = val; + if (setting.label.includes('Peer')) { + stateVal = props.peers.find((peer) => peer.name === val)!; + } else if (setting.label.includes('Sync Mode')) { + stateVal = + val === 'avro' + ? QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO + : QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT; + } + setting.stateHandler(stateVal, props.setter); + }; + const paramDisplayCondition = (setting: MirrorSetting) => { + const label = setting.label.toLowerCase(); + if ( + (label.includes('snapshot') && + props.mirrorConfig.doInitialCopy !== true) || + (label.includes('snapshot staging') && + props.mirrorConfig.snapshotSyncMode?.toString() !== '1') || + (label.includes('cdc staging') && + props.mirrorConfig.cdcSyncMode?.toString() !== '1') + ) { + return false; + } + return true; + }; + + return ( + <> + {props.settings.map((setting, id) => { + return ( + paramDisplayCondition(setting) && + (setting.type === 'switch' ? ( + {setting.label}} + action={ +
+ handleChange(state, setting)} + /> + {setting.tips && ( + + )} +
+ } + /> + ) : setting.type === 'select' ? ( + {setting.label}} + action={ +
+ + {setting.tips && ( + + )} +
+ } + /> + ) : ( + + {setting.label} + {setting.required && ( + + + + )} + + } + action={ +
+ handleChange(e.target.value, setting)} + /> + {setting.tips && ( + + )} +
+ } + /> + )) + ); + })} + + ); +} diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts new file mode 100644 index 0000000000..5654c480a2 --- /dev/null +++ b/ui/app/mirrors/create/handlers.ts @@ -0,0 +1,78 @@ +import { ListPeersResponse } from '@/grpc_generated/route'; +import { AppRouterInstance } from 'next/dist/shared/lib/app-router-context'; +import { Dispatch, SetStateAction } from 'react'; +import { MirrorConfig, TableMapRow } from '../types'; +import { cdcSchema, tableMappingSchema } from './schema'; + +export const listAllPeers = async () => { + const peers = await fetch('../api/peers').then((res) => res.json()); + return peers as ListPeersResponse; +}; + +const validateFlowFields = ( + tableMapping: TableMapRow[], + setMsg: Dispatch>, + config: MirrorConfig +): boolean => { + let validationErr: string | undefined; + const tablesValidity = tableMappingSchema.safeParse(tableMapping); + if (!tablesValidity.success) { + validationErr = tablesValidity.error.issues[0].message; + setMsg({ ok: false, msg: validationErr }); + return false; + } + const configValidity = cdcSchema.safeParse(config); + if (!configValidity.success) { + validationErr = configValidity.error.issues[0].message; + setMsg({ ok: false, msg: validationErr }); + return false; + } + setMsg({ ok: true, msg: '' }); + return true; +}; + +const reformattedTableMapping = (tableMapping: TableMapRow[]) => { + const mapping: { [key: string]: string } = {}; + tableMapping.forEach((row) => { + mapping[row.source] = row.destination; + }); + return mapping; +}; +export const handleCreate = async ( + flowJobName: string, + rows: TableMapRow[], + config: MirrorConfig, + setMsg: Dispatch< + SetStateAction<{ + ok: boolean; + msg: string; + }> + >, + setLoading: Dispatch>, + router: AppRouterInstance +) => { + if (!flowJobName) { + setMsg({ ok: false, msg: 'Mirror name is required' }); + return; + } + const isValid = validateFlowFields(rows, setMsg, config); + if (!isValid) return; + const tableNameMapping = reformattedTableMapping(rows); + config['tableNameMapping'] = tableNameMapping; + config['flowJobName'] = flowJobName; + setLoading(true); + const statusMessage = await fetch('/api/mirrors/cdc', { + method: 'POST', + body: JSON.stringify({ + config, + }), + }).then((res) => res.text()); + if (statusMessage !== 'created') { + setMsg({ ok: false, msg: statusMessage }); + setLoading(false); + return; + } + setMsg({ ok: true, msg: 'CDC Mirror created successfully' }); + router.push('/mirrors'); + setLoading(false); +}; diff --git a/ui/app/mirrors/create/helpers/cdc.ts b/ui/app/mirrors/create/helpers/cdc.ts new file mode 100644 index 0000000000..c429ecab79 --- /dev/null +++ b/ui/app/mirrors/create/helpers/cdc.ts @@ -0,0 +1,129 @@ +import { QRepSyncMode } from '@/grpc_generated/flow'; +import { Peer } from '@/grpc_generated/peers'; +import { MirrorSetting } from './common'; +export const cdcSettings: MirrorSetting[] = [ + { + label: 'Source Peer', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, source: value as Peer })), + tips: 'The peer from which we will be replicating data. Ensure the prerequisites for this peer are met.', + helpfulLink: + 'https://docs.peerdb.io/usecases/Real-time%20CDC/postgres-to-snowflake#prerequisites', + type: 'select', + required: true, + }, + { + label: 'Destination Peer', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, destination: value as Peer })), + tips: 'The peer to which data will be replicated.', + type: 'select', + required: true, + }, + { + label: 'Initial Copy', + stateHandler: (value, setter) => + setter((curr) => ({ + ...curr, + doInitialCopy: (value as boolean) || false, + })), + tips: 'Specify if you want initial load to happen for your tables.', + helpfulLink: 'https://www.postgresql.org/docs/8.0/user-manag.html', + type: 'switch', + }, + { + label: 'Publication Name', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, publicationName: (value as string) || '' })), + tips: 'If set, PeerDB will use this publication for the mirror.', + }, + { + label: 'Replication Slot Name', + stateHandler: (value, setter) => + setter((curr) => ({ + ...curr, + replicationSlotName: (value as string) || '', + })), + tips: 'If set, PeerDB will use this slot for the mirror.', + }, + { + label: 'Snapshot Number of Rows Per Partition', + stateHandler: (value, setter) => + setter((curr) => ({ + ...curr, + snapshotNumRowsPerPartition: parseInt(value as string, 10) || 500000, + })), + tips: 'PeerDB splits up table data into partitions for increased performance. This setting controls the number of rows per partition. The default value is 500000.', + default: '500000', + type: 'number', + }, + { + label: 'Snapshot Maximum Parallel Workers', + stateHandler: (value, setter) => + setter((curr) => ({ + ...curr, + snapshotMaxParallelWorkers: parseInt(value as string, 10) || 8, + })), + tips: 'PeerDB spins up parallel threads for each partition. This setting controls the number of partitions to sync in parallel. The default value is 8.', + default: '8', + type: 'number', + }, + { + label: 'Snapshot Number of Tables In Parallel', + stateHandler: (value, setter) => + setter((curr) => ({ + ...curr, + snapshotNumTablesInParallel: parseInt(value as string, 10) || 1, + })), + tips: 'Specify the number of tables to sync perform initial load for, in parallel. The default value is 1.', + default: '1', + type: 'number', + }, + { + label: 'Snapshot Sync Mode', + stateHandler: (value, setter) => + setter((curr) => ({ + ...curr, + snapshotSyncMode: + (value as QRepSyncMode) || QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + })), + tips: 'Specify whether you want the sync mode for initial load to be via SQL or by staging AVRO files. The default mode is SQL.', + default: 'SQL', + type: 'select', + }, + { + label: 'CDC Sync Mode', + stateHandler: (value, setter) => + setter((curr) => ({ + ...curr, + cdcSyncMode: + (value as QRepSyncMode) || QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, + })), + tips: 'Specify whether you want the sync mode for CDC to be via SQL or by staging AVRO files. The default mode is SQL.', + default: 'SQL', + type: 'select', + }, + { + label: 'Snapshot Staging Path', + stateHandler: (value, setter) => + setter((curr) => ({ + ...curr, + snapshotStagingPath: value as string | '', + })), + tips: 'You can specify staging path if you have set the Snapshot sync mode as AVRO. For Snowflake as destination peer, this must be either empty or an S3 bucket URL.', + }, + { + label: 'CDC Staging Path', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, cdcStagingPath: (value as string) || '' })), + tips: 'You can specify staging path if you have set the CDC sync mode as AVRO. For Snowflake as destination peer, this must be either empty or an S3 bucket url', + }, + { + label: 'Soft Delete', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, softDelete: (value as boolean) || false })), + tips: 'Allows you to mark some records as deleted without actual erasure from the database', + default: 'SQL', + type: 'switch', + }, +]; diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts new file mode 100644 index 0000000000..f7a5384e9f --- /dev/null +++ b/ui/app/mirrors/create/helpers/common.ts @@ -0,0 +1,41 @@ +import { FlowConnectionConfigs, QRepSyncMode } from '@/grpc_generated/flow'; +import { Peer } from '@/grpc_generated/peers'; +import { MirrorSetter } from '../../types'; + +export interface MirrorSetting { + label: string; + stateHandler: ( + value: string | Peer | boolean | QRepSyncMode, + setter: MirrorSetter + ) => void; + type?: string; + required?: boolean; + tips?: string; + helpfulLink?: string; + default?: string | number; +} + +export const blankCDCSetting: FlowConnectionConfigs = { + source: undefined, + destination: undefined, + flowJobName: '', + tableSchema: undefined, + tableNameMapping: {}, + srcTableIdNameMapping: {}, + tableNameSchemaMapping: {}, + metadataPeer: undefined, + maxBatchSize: 0, + doInitialCopy: false, + publicationName: '', + snapshotNumRowsPerPartition: 500000, + snapshotMaxParallelWorkers: 8, + snapshotNumTablesInParallel: 1, + snapshotSyncMode: 0, + cdcSyncMode: 0, + snapshotStagingPath: '', + cdcStagingPath: '', + softDelete: false, + replicationSlotName: '', + pushBatchSize: 0, + pushParallelism: 0, +}; diff --git a/ui/app/mirrors/create/page.tsx b/ui/app/mirrors/create/page.tsx index 75341d4910..97752375e1 100644 --- a/ui/app/mirrors/create/page.tsx +++ b/ui/app/mirrors/create/page.tsx @@ -1,12 +1,40 @@ +'use client'; +import { FlowConnectionConfigs } from '@/grpc_generated/flow'; +import { Peer } from '@/grpc_generated/peers'; import { Button } from '@/lib/Button'; import { ButtonGroup } from '@/lib/ButtonGroup'; import { Label } from '@/lib/Label'; import { LayoutMain, RowWithSelect, RowWithTextField } from '@/lib/Layout'; import { Panel } from '@/lib/Panel'; -import { Select } from '@/lib/Select'; +import { Select, SelectItem } from '@/lib/Select'; import { TextField } from '@/lib/TextField'; - +import { Divider } from '@tremor/react'; +import Link from 'next/link'; +import { useRouter } from 'next/navigation'; +import { useEffect, useState } from 'react'; +import { TableMapRow } from '../types'; +import MirrorConfig from './config'; +import { handleCreate, listAllPeers } from './handlers'; +import { cdcSettings } from './helpers/cdc'; +import { blankCDCSetting } from './helpers/common'; +import TableMapping from './tablemapping'; export default function CreateMirrors() { + const router = useRouter(); + const [mirrorName, setMirrorName] = useState(''); + const [mirrorType, setMirrorType] = useState<'CDC' | 'QREP'>('CDC'); + const [formMessage, setFormMessage] = useState<{ ok: boolean; msg: string }>({ + ok: true, + msg: '', + }); + const [loading, setLoading] = useState(false); + const [config, setConfig] = useState(blankCDCSetting); + const [peers, setPeers] = useState([]); + const [rows, setRows] = useState([ + { source: '', destination: '' }, + ]); + useEffect(() => { + listAllPeers().then((peers) => setPeers(peers.peers)); + }, []); return ( @@ -18,100 +46,69 @@ export default function CreateMirrors() { - Mirror type } - action={ + CDC + } /> - - Source - - } - action={} - /> - Query - - } - action={ - - } - /> - - Watermark ID - - } - action={ - - } - /> - - Watermark table - - } + label={} action={ setMirrorName(e.target.value)} /> } /> - - Rows per partition - - } - action={ - - } + + + + + + {!loading && formMessage.msg.length > 0 && ( + + )} + - - + + diff --git a/ui/app/mirrors/create/schema.ts b/ui/app/mirrors/create/schema.ts new file mode 100644 index 0000000000..4db9e0fe47 --- /dev/null +++ b/ui/app/mirrors/create/schema.ts @@ -0,0 +1,74 @@ +import * as z from 'zod'; + +export const tableMappingSchema = z + .array( + z.object({ + source: z + .string() + .min(1, 'source table names, if added, must be non-empty'), + destination: z + .string() + .min(1, 'destination table names, if added, must be non-empty'), + }) + ) + .nonempty('At least one table mapping is required'); + +export const cdcSchema = z.object({ + source: z.object({ + name: z.string().nonempty(), + type: z.any(), + config: z.any(), + }), + destination: z.object({ + name: z.string().nonempty(), + type: z.any(), + config: z.any(), + }), + doInitialCopy: z.boolean().optional(), + publicationName: z + .string({ + invalid_type_error: 'Publication name must be a string', + }) + .max(255, 'Publication name must be less than 255 characters') + .optional(), + replicationSlotName: z + .string({ + invalid_type_error: 'Publication name must be a string', + }) + .max(255, 'Publication name must be less than 255 characters') + .optional(), + snapshotNumRowsPerPartition: z + .number({ + invalid_type_error: 'Snapshow rows per partition must be a number', + }) + .int() + .min(1, 'Snapshow rows per partition must be a positive integer') + .optional(), + snapshotMaxParallelWorkers: z + .number({ + invalid_type_error: 'Snapshow max workers must be a number', + }) + .int() + .min(1, 'Snapshow max workers must be a positive integer') + .optional(), + snapshotNumTablesInParallel: z + .number({ + invalid_type_error: 'Snapshow parallel tables must be a number', + }) + .int() + .min(1, 'Snapshow parallel tables must be a positive integer') + .optional(), + snapshotStagingPath: z + .string({ + invalid_type_error: 'Snapshot staging path must be a string', + }) + .max(255, 'Snapshot staging path must be less than 255 characters') + .optional(), + cdcStagingPath: z + .string({ + invalid_type_error: 'CDC staging path must be a string', + }) + .max(255, 'CDC staging path must be less than 255 characters') + .optional(), + softDelete: z.boolean().optional(), +}); diff --git a/ui/app/mirrors/create/tablemapping.tsx b/ui/app/mirrors/create/tablemapping.tsx new file mode 100644 index 0000000000..37b0bef5d1 --- /dev/null +++ b/ui/app/mirrors/create/tablemapping.tsx @@ -0,0 +1,93 @@ +'use client'; +import { Button } from '@/lib/Button'; +import { Icon } from '@/lib/Icon'; +import { TextField } from '@/lib/TextField'; +import { Dispatch, SetStateAction } from 'react'; +import { TableMapRow } from '../types'; + +interface TableMappingProps { + rows: TableMapRow[]; + setRows: Dispatch>; +} +const TableMapping = ({ rows, setRows }: TableMappingProps) => { + const handleAddRow = () => { + setRows([...rows, { source: '', destination: '' }]); + }; + + const handleRemoveRow = (index: number) => { + if (rows.length === 1) { + return; + } + const newRows = [...rows]; + newRows.splice(index, 1); + setRows(newRows); + }; + + const handleTableChange = ( + index: number, + field: 'source' | 'destination', + value: string + ) => { + const newRows = [...rows]; + newRows[index][field] = value; + setRows(newRows); + }; + + return ( +
+ + + + + + + + + + {rows.map((row, index) => ( + + + + + + ))} + +
Source Table + Destination Table +
+ + handleTableChange(index, 'source', e.target.value) + } + /> + + + handleTableChange(index, 'destination', e.target.value) + } + /> + + +
+ +
+ ); +}; + +export default TableMapping; diff --git a/ui/app/mirrors/types.ts b/ui/app/mirrors/types.ts new file mode 100644 index 0000000000..879ee6a960 --- /dev/null +++ b/ui/app/mirrors/types.ts @@ -0,0 +1,6 @@ +import { FlowConnectionConfigs } from '@/grpc_generated/flow'; +import { Dispatch, SetStateAction } from 'react'; + +export type MirrorConfig = FlowConnectionConfigs; +export type MirrorSetter = Dispatch>; +export type TableMapRow = { source: string; destination: string }; diff --git a/ui/app/peers/create/configuration/helpers/common.ts b/ui/app/peers/create/configuration/helpers/common.ts index b36423bf4d..24851da780 100644 --- a/ui/app/peers/create/configuration/helpers/common.ts +++ b/ui/app/peers/create/configuration/helpers/common.ts @@ -2,7 +2,7 @@ import { PeerConfig, PeerSetter } from '../types'; import { blankPostgresSetting } from './pg'; import { blankSnowflakeSetting } from './sf'; -export interface Setting { +export interface PeerSetting { label: string; stateHandler: (value: string, setter: PeerSetter) => void; type?: string; diff --git a/ui/app/peers/create/configuration/helpers/pg.ts b/ui/app/peers/create/configuration/helpers/pg.ts index 8d287e3f25..84c464bf7a 100644 --- a/ui/app/peers/create/configuration/helpers/pg.ts +++ b/ui/app/peers/create/configuration/helpers/pg.ts @@ -1,7 +1,7 @@ import { PostgresConfig } from '@/grpc_generated/peers'; -import { Setting } from './common'; +import { PeerSetting } from './common'; -export const postgresSetting: Setting[] = [ +export const postgresSetting: PeerSetting[] = [ { label: 'Host', stateHandler: (value, setter) => diff --git a/ui/app/peers/create/configuration/helpers/sf.ts b/ui/app/peers/create/configuration/helpers/sf.ts index 0d90fd2c12..44cbde7fe8 100644 --- a/ui/app/peers/create/configuration/helpers/sf.ts +++ b/ui/app/peers/create/configuration/helpers/sf.ts @@ -1,7 +1,7 @@ import { SnowflakeConfig } from '@/grpc_generated/peers'; -import { Setting } from './common'; +import { PeerSetting } from './common'; -export const snowflakeSetting: Setting[] = [ +export const snowflakeSetting: PeerSetting[] = [ { label: 'Account ID', stateHandler: (value, setter) => diff --git a/ui/app/peers/create/configuration/page.tsx b/ui/app/peers/create/configuration/page.tsx index c0e75f3322..8e1eec6f79 100644 --- a/ui/app/peers/create/configuration/page.tsx +++ b/ui/app/peers/create/configuration/page.tsx @@ -11,7 +11,7 @@ import { useRouter, useSearchParams } from 'next/navigation'; import { useState } from 'react'; import ConfigForm from '../../../../components/ConfigForm'; import { handleCreate, handleValidate } from './handlers'; -import { Setting, getBlankSetting } from './helpers/common'; +import { PeerSetting, getBlankSetting } from './helpers/common'; import { postgresSetting } from './helpers/pg'; import { snowflakeSetting } from './helpers/sf'; import { PeerConfig } from './types'; @@ -28,7 +28,7 @@ export default function CreateConfig() { }); const [loading, setLoading] = useState(false); const configComponentMap = (dbType: string) => { - const configForm = (settingList: Setting[]) => ( + const configForm = (settingList: PeerSetting[]) => ( ); switch (dbType) { @@ -69,6 +69,7 @@ export default function CreateConfig() { action={ setName(e.target.value)} /> } diff --git a/ui/app/peers/create/configuration/schema.ts b/ui/app/peers/create/configuration/schema.ts index 08bf97a3d1..4134f08f1e 100644 --- a/ui/app/peers/create/configuration/schema.ts +++ b/ui/app/peers/create/configuration/schema.ts @@ -6,7 +6,7 @@ export const pgSchema = z.object({ required_error: 'Host is required', invalid_type_error: 'Host must be a string', }) - .nonempty() + .nonempty({ message: 'Host cannot be empty' }) .max(255, 'Host must be less than 255 characters'), port: z .number({ diff --git a/ui/app/peers/handler.ts b/ui/app/peers/handler.ts new file mode 100644 index 0000000000..824a4f51e6 --- /dev/null +++ b/ui/app/peers/handler.ts @@ -0,0 +1,8 @@ +import { ListPeersRequest } from '@/grpc_generated/route'; +import { GetFlowServiceClientFromEnv } from '@/rpc/rpc'; +export async function fetchPeers() { + let flowServiceClient = GetFlowServiceClientFromEnv(); + let req: ListPeersRequest = {}; + let peers = await flowServiceClient.listPeers(req); + return peers.peers; +} diff --git a/ui/app/peers/page.tsx b/ui/app/peers/page.tsx index 9951c55b4f..75fbafa617 100644 --- a/ui/app/peers/page.tsx +++ b/ui/app/peers/page.tsx @@ -1,5 +1,4 @@ import { Peer } from '@/grpc_generated/peers'; -import { ListPeersRequest } from '@/grpc_generated/route'; import { Button } from '@/lib/Button'; import { Checkbox } from '@/lib/Checkbox'; import { Icon } from '@/lib/Icon'; @@ -9,19 +8,13 @@ import { Panel } from '@/lib/Panel'; import { SearchField } from '@/lib/SearchField'; import { Select } from '@/lib/Select'; import { Table, TableCell, TableRow } from '@/lib/Table'; -import { GetFlowServiceClientFromEnv } from '@/rpc/rpc'; +import { fetchPeers } from './handler'; + import Link from 'next/link'; import { Suspense } from 'react'; import { Header } from '../../lib/Header'; export const dynamic = 'force-dynamic'; -async function fetchPeers() { - let flowServiceClient = GetFlowServiceClientFromEnv(); - let req: ListPeersRequest = {}; - let peers = await flowServiceClient.listPeers(req); - return peers.peers; -} - function PeerRow({ peer }: { peer: Peer }) { return ( diff --git a/ui/components/ConfigForm.tsx b/ui/components/ConfigForm.tsx index e9e20a27dd..b12b94f1cc 100644 --- a/ui/components/ConfigForm.tsx +++ b/ui/components/ConfigForm.tsx @@ -1,14 +1,14 @@ 'use client'; -import { Setting } from '@/app/peers/create/configuration/helpers/common'; +import { PeerSetting } from '@/app/peers/create/configuration/helpers/common'; +import { PeerSetter } from '@/app/peers/create/configuration/types'; import { Label } from '@/lib/Label'; import { RowWithTextField } from '@/lib/Layout'; import { TextField } from '@/lib/TextField'; import { Tooltip } from '@/lib/Tooltip'; -import { PeerSetter } from '../app/peers/create/configuration/types'; import { InfoPopover } from './InfoPopover'; interface ConfigProps { - settings: Setting[]; + settings: PeerSetting[]; setter: PeerSetter; } @@ -31,7 +31,7 @@ export default function ConfigForm(props: ConfigProps) { const handleChange = ( e: React.ChangeEvent, - setting: Setting + setting: PeerSetting ) => { if (setting.type === 'file') { if (e.target.files) handleFile(e.target.files[0], setting.stateHandler); diff --git a/ui/grpc_generated/route.ts b/ui/grpc_generated/route.ts index 64dace7ec2..6874a48d93 100644 --- a/ui/grpc_generated/route.ts +++ b/ui/grpc_generated/route.ts @@ -97,6 +97,7 @@ export function createPeerStatusToJSON(object: CreatePeerStatus): string { export interface CreateCDCFlowRequest { connectionConfigs: FlowConnectionConfigs | undefined; + createCatalogEntry: boolean; } export interface CreateCDCFlowResponse { @@ -149,7 +150,7 @@ export interface CreatePeerResponse { } function createBaseCreateCDCFlowRequest(): CreateCDCFlowRequest { - return { connectionConfigs: undefined }; + return { connectionConfigs: undefined, createCatalogEntry: false }; } export const CreateCDCFlowRequest = { @@ -157,6 +158,9 @@ export const CreateCDCFlowRequest = { if (message.connectionConfigs !== undefined) { FlowConnectionConfigs.encode(message.connectionConfigs, writer.uint32(10).fork()).ldelim(); } + if (message.createCatalogEntry === true) { + writer.uint32(16).bool(message.createCatalogEntry); + } return writer; }, @@ -174,6 +178,13 @@ export const CreateCDCFlowRequest = { message.connectionConfigs = FlowConnectionConfigs.decode(reader, reader.uint32()); continue; + case 2: + if (tag !== 16) { + break; + } + + message.createCatalogEntry = reader.bool(); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -188,6 +199,7 @@ export const CreateCDCFlowRequest = { connectionConfigs: isSet(object.connectionConfigs) ? FlowConnectionConfigs.fromJSON(object.connectionConfigs) : undefined, + createCatalogEntry: isSet(object.createCatalogEntry) ? Boolean(object.createCatalogEntry) : false, }; }, @@ -196,6 +208,9 @@ export const CreateCDCFlowRequest = { if (message.connectionConfigs !== undefined) { obj.connectionConfigs = FlowConnectionConfigs.toJSON(message.connectionConfigs); } + if (message.createCatalogEntry === true) { + obj.createCatalogEntry = message.createCatalogEntry; + } return obj; }, @@ -207,6 +222,7 @@ export const CreateCDCFlowRequest = { message.connectionConfigs = (object.connectionConfigs !== undefined && object.connectionConfigs !== null) ? FlowConnectionConfigs.fromPartial(object.connectionConfigs) : undefined; + message.createCatalogEntry = object.createCatalogEntry ?? false; return message; }, };