Skip to content

Commit

Permalink
UI - Create Mirror (#490)
Browse files Browse the repository at this point in the history
- Implements Create Mirror user flow
- Tested happy path by running a successful PG-SF mirror
- Snapshot fields appear only when initial copy is toggled
- Staging path fields appear only when sync mode is avro
- Frontend validation tested

Fixes #432 and #433
  • Loading branch information
Amogh-Bharadwaj authored Oct 9, 2023
1 parent dd178cc commit 1b0353c
Show file tree
Hide file tree
Showing 28 changed files with 933 additions and 215 deletions.
59 changes: 59 additions & 0 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"strings"
"time"

"github.com/PeerDB-io/peer-flow/connectors"
Expand Down Expand Up @@ -31,6 +32,57 @@ func NewFlowRequestHandler(temporalClient client.Client, pool *pgxpool.Pool) *Fl
}
}

func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (int32, int32, error) {
var id int32
var peerType int32
err := h.pool.QueryRow(ctx, "SELECT id,type FROM peers WHERE name = $1", peerName).Scan(&id, &peerType)
if err != nil {
log.Errorf("unable to query peer id for peer %s: %s", peerName, err.Error())
return -1, -1, fmt.Errorf("unable to query peer id for peer %s: %s", peerName, err)
}
return id, peerType, nil
}

func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
tableIdentifierParts := strings.Split(tableIdentifier, ".")
if len(tableIdentifierParts) == 1 && peerDBType != int32(protos.DBType_BIGQUERY) {
tableIdentifierParts = append([]string{"public"}, tableIdentifierParts...)
}
return strings.Join(tableIdentifierParts, ".")
}

func (h *FlowRequestHandler) createFlowJobEntry(ctx context.Context,
req *protos.CreateCDCFlowRequest, workflowID string) error {
sourcePeerID, sourePeerType, srcErr := h.getPeerID(ctx, req.ConnectionConfigs.Source.Name)
if srcErr != nil {
return fmt.Errorf("unable to get peer id for source peer %s: %w",
req.ConnectionConfigs.Source.Name, srcErr)
}

destinationPeerID, destinationPeerType, dstErr := h.getPeerID(ctx, req.ConnectionConfigs.Destination.Name)
if dstErr != nil {
return fmt.Errorf("unable to get peer id for target peer %s: %w",
req.ConnectionConfigs.Destination.Name, srcErr)
}

for sourceTableIdentifier := range req.ConnectionConfigs.TableNameMapping {
destinationTableIdentifier := req.ConnectionConfigs.TableNameMapping[sourceTableIdentifier]
_, err := h.pool.Exec(ctx, `
INSERT INTO flows (workflow_id, name, source_peer, destination_peer, description,
source_table_identifier, destination_table_identifier) VALUES ($1, $2, $3, $4, $5, $6, $7)
`, workflowID, req.ConnectionConfigs.FlowJobName, sourcePeerID, destinationPeerID,
"Mirror created via GRPC",
schemaForTableIdentifier(sourceTableIdentifier, sourePeerType),
schemaForTableIdentifier(destinationTableIdentifier, destinationPeerType))
if err != nil {
return fmt.Errorf("unable to insert into flows table for flow %s with source table %s: %w",
req.ConnectionConfigs.FlowJobName, sourceTableIdentifier, err)
}
}

return nil
}

// Close closes the connection pool
func (h *FlowRequestHandler) Close() {
if h.pool != nil {
Expand Down Expand Up @@ -59,6 +111,13 @@ func (h *FlowRequestHandler) CreateCDCFlow(
MaxBatchSize: maxBatchSize,
}

if req.CreateCatalogEntry {
err := h.createFlowJobEntry(ctx, req, workflowID)
if err != nil {
return nil, fmt.Errorf("unable to create flow job entry: %w", err)
}
}

state := peerflow.NewCDCFlowState()
_, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
Expand Down
235 changes: 123 additions & 112 deletions flow/generated/protos/route.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,14 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
match create_mirror {
CDC(cdc) => {
let mut flow_job_table_mappings = vec![];
for table_mapping in &cdc.table_mappings {
for table_mapping in &cdc.mappings {
flow_job_table_mappings.push(FlowJobTableMapping {
source_table_identifier: table_mapping
.source_table_identifier
.to_string()
.to_lowercase(),
target_table_identifier: table_mapping
.target_table_identifier
.target_identifier
.to_string()
.to_lowercase(),
});
Expand Down
1 change: 1 addition & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl FlowGrpcClient {
) -> anyhow::Result<String> {
let create_peer_flow_req = pt::peerdb_route::CreateCdcFlowRequest {
connection_configs: Some(peer_flow_config),
create_catalog_entry: false
};
let response = self.client.create_cdc_flow(create_peer_flow_req).await?;
let workflow_id = response.into_inner().worflow_id;
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
pub struct CreateCdcFlowRequest {
#[prost(message, optional, tag="1")]
pub connection_configs: ::core::option::Option<super::peerdb_flow::FlowConnectionConfigs>,
#[prost(bool, tag="2")]
pub create_catalog_entry: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_route.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@ impl serde::Serialize for CreateCdcFlowRequest {
if self.connection_configs.is_some() {
len += 1;
}
if self.create_catalog_entry {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_route.CreateCDCFlowRequest", len)?;
if let Some(v) = self.connection_configs.as_ref() {
struct_ser.serialize_field("connectionConfigs", v)?;
}
if self.create_catalog_entry {
struct_ser.serialize_field("createCatalogEntry", &self.create_catalog_entry)?;
}
struct_ser.end()
}
}
Expand All @@ -26,11 +32,14 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest {
const FIELDS: &[&str] = &[
"connection_configs",
"connectionConfigs",
"create_catalog_entry",
"createCatalogEntry",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
ConnectionConfigs,
CreateCatalogEntry,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -54,6 +63,7 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest {
{
match value {
"connectionConfigs" | "connection_configs" => Ok(GeneratedField::ConnectionConfigs),
"createCatalogEntry" | "create_catalog_entry" => Ok(GeneratedField::CreateCatalogEntry),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -74,6 +84,7 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest {
V: serde::de::MapAccess<'de>,
{
let mut connection_configs__ = None;
let mut create_catalog_entry__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::ConnectionConfigs => {
Expand All @@ -82,13 +93,20 @@ impl<'de> serde::Deserialize<'de> for CreateCdcFlowRequest {
}
connection_configs__ = map.next_value()?;
}
GeneratedField::CreateCatalogEntry => {
if create_catalog_entry__.is_some() {
return Err(serde::de::Error::duplicate_field("createCatalogEntry"));
}
create_catalog_entry__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
}
}
Ok(CreateCdcFlowRequest {
connection_configs: connection_configs__,
create_catalog_entry: create_catalog_entry__.unwrap_or_default(),
})
}
}
Expand Down
3 changes: 1 addition & 2 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,7 @@ impl NexusBackend {
"got workflow id: {:?}",
workflow_details.as_ref().map(|w| &w.workflow_id)
);
if workflow_details.is_some() {
let workflow_details = workflow_details.unwrap();
if let Some(workflow_details) = workflow_details {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
flow_handler
.shutdown_flow_job(flow_job_name, workflow_details)
Expand Down
2 changes: 1 addition & 1 deletion nexus/sqlparser-rs
1 change: 1 addition & 0 deletions protos/route.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package peerdb_route;

message CreateCDCFlowRequest {
peerdb_flow.FlowConnectionConfigs connection_configs = 1;
bool create_catalog_entry = 2;
}

message CreateCDCFlowResponse {
Expand Down
21 changes: 21 additions & 0 deletions ui/app/api/mirrors/cdc/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import {
CreateCDCFlowRequest,
CreateCDCFlowResponse,
} from '@/grpc_generated/route';
import { GetFlowServiceClientFromEnv } from '@/rpc/rpc';

export async function POST(request: Request) {
const body = await request.json();
const { config } = body;
const flowServiceClient = GetFlowServiceClientFromEnv();
const req: CreateCDCFlowRequest = {
connectionConfigs: config,
createCatalogEntry: true,
};
const createStatus: CreateCDCFlowResponse =
await flowServiceClient.createCdcFlow(req);
if (!createStatus.worflowId) {
return new Response('Failed to create CDC mirror');
}
return new Response('created');
}
8 changes: 8 additions & 0 deletions ui/app/api/peers/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
CreatePeerRequest,
CreatePeerResponse,
CreatePeerStatus,
ListPeersRequest,
ValidatePeerRequest,
ValidatePeerResponse,
ValidatePeerStatus,
Expand Down Expand Up @@ -63,3 +64,10 @@ export async function POST(request: Request) {
} else return new Response('status of peer creation is unknown');
} else return new Response('mode of peer creation is unknown');
}

export async function GET(request: Request) {
let flowServiceClient = GetFlowServiceClientFromEnv();
let req: ListPeersRequest = {};
let peers = await flowServiceClient.listPeers(req);
return new Response(JSON.stringify(peers));
}
Loading

0 comments on commit 1b0353c

Please sign in to comment.