diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 7f312e2012..47c524bb27 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -342,8 +342,7 @@ func PullCdcRecords[Items model.Items]( nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) addRecordWithKey := func(key model.TableWithPkey, rec model.Record[Items]) error { - err := cdcRecordsStorage.Set(logger, key, rec) - if err != nil { + if err := cdcRecordsStorage.Set(logger, key, rec); err != nil { return err } if err := records.AddRecord(ctx, rec); err != nil { diff --git a/ui/app/api/peers/route.ts b/ui/app/api/peers/route.ts index 4f468d8fb2..607f3658ae 100644 --- a/ui/app/api/peers/route.ts +++ b/ui/app/api/peers/route.ts @@ -1,6 +1,4 @@ -import { getTruePeer } from '@/app/api/peers/getTruePeer'; import { - CatalogPeer, PeerConfig, UCreatePeerResponse, UValidatePeerResponse, @@ -151,8 +149,9 @@ export async function POST(request: Request) { } // GET all the peers from the database -export async function GET(request: Request) { - const peers = await prisma.peers.findMany(); - const truePeers: Peer[] = peers.map((peer: CatalogPeer) => getTruePeer(peer)); - return new Response(JSON.stringify(truePeers)); +export async function GET(_request: Request) { + const peers = await prisma.peers.findMany({ + select: { name: true, type: true }, + }); + return new Response(JSON.stringify(peers)); } diff --git a/ui/app/dto/PeersDTO.ts b/ui/app/dto/PeersDTO.ts index abc3bc80fb..81c559e77c 100644 --- a/ui/app/dto/PeersDTO.ts +++ b/ui/app/dto/PeersDTO.ts @@ -1,6 +1,7 @@ import { BigqueryConfig, ClickhouseConfig, + DBType, ElasticsearchConfig, EventHubConfig, EventHubGroupConfig, @@ -60,6 +61,7 @@ export type CatalogPeer = { type: number; options: Buffer; }; +export type PeerRef = { name: string; type: DBType }; export type PeerSetter = React.Dispatch>; export type SlotLagPoint = { diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index d55ba671ff..a02c41dfa0 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -12,6 +12,8 @@ import TableMapping from './tablemapping'; interface MirrorConfigProps { settings: MirrorSetting[]; mirrorConfig: CDCConfig; + destinationType: DBType; + sourceType: DBType; setter: MirrorSetter; rows: TableMapRow[]; setRows: Dispatch>; @@ -33,6 +35,8 @@ export const defaultSyncMode = (dtype: DBType | undefined) => { export default function CDCConfigForm({ settings, mirrorConfig, + destinationType, + sourceType, setter, rows, setRows, @@ -45,22 +49,24 @@ export default function CDCConfigForm({ setting.stateHandler(stateVal, setter); }; - const normalSettings = useMemo(() => { - return settings!.filter( - (setting) => - !( - (IsQueuePeer(mirrorConfig.destination?.type) && - setting.advanced === AdvancedSettingType.QUEUE) || - setting.advanced === AdvancedSettingType.ALL - ) - ); - }, [settings, mirrorConfig.destination?.type]); + const normalSettings = useMemo( + () => + settings!.filter( + (setting) => + !( + (IsQueuePeer(destinationType) && + setting.advanced === AdvancedSettingType.QUEUE) || + setting.advanced === AdvancedSettingType.ALL + ) + ), + [settings, destinationType] + ); const advancedSettings = useMemo(() => { return settings! .map((setting) => { if ( - IsQueuePeer(mirrorConfig.destination?.type) && + IsQueuePeer(destinationType) && setting.advanced === AdvancedSettingType.QUEUE ) { setting.stateHandler(600, setter); @@ -71,30 +77,29 @@ export default function CDCConfigForm({ } }) .filter((setting) => setting !== undefined); - }, [settings, mirrorConfig.destination?.type, setter]); + }, [settings, destinationType, setter]); const paramDisplayCondition = (setting: MirrorSetting) => { const label = setting.label.toLowerCase(); - const isQueue = IsQueuePeer(mirrorConfig.destination?.type); + const isQueue = IsQueuePeer(destinationType); if ( (label.includes('snapshot') && mirrorConfig.doInitialSnapshot !== true) || (label === 'replication slot name' && mirrorConfig.doInitialSnapshot === true) || (label.includes('staging path') && - defaultSyncMode(mirrorConfig.destination?.type) !== 'AVRO') || + defaultSyncMode(destinationType) !== 'AVRO') || (isQueue && label.includes('soft delete')) || - (mirrorConfig.destination?.type === DBType.EVENTHUBS && + (destinationType === DBType.EVENTHUBS && (label.includes('initial copy') || label.includes('initial load') || label.includes('snapshot'))) || - ((mirrorConfig.source?.type !== DBType.POSTGRES || - mirrorConfig.destination?.type !== DBType.POSTGRES) && + ((sourceType !== DBType.POSTGRES || + destinationType !== DBType.POSTGRES) && label.includes('type system')) || - (mirrorConfig.destination?.type !== DBType.BIGQUERY && - label.includes('column name')) || + (destinationType !== DBType.BIGQUERY && label.includes('column name')) || (label.includes('soft delete') && ![DBType.BIGQUERY, DBType.POSTGRES, DBType.SNOWFLAKE].includes( - mirrorConfig.destination?.type ?? DBType.UNRECOGNIZED + destinationType ?? DBType.UNRECOGNIZED )) ) { return false; @@ -104,17 +109,17 @@ export default function CDCConfigForm({ useEffect(() => { setPubLoading(true); - fetchPublications(mirrorConfig.source?.name || '').then((pubs) => { + fetchPublications(mirrorConfig.source || '').then((pubs) => { setPublications(pubs); setPubLoading(false); }); - }, [mirrorConfig.source?.name]); + }, [mirrorConfig.source]); if (mirrorConfig.source != undefined && mirrorConfig.destination != undefined) return ( <> - {normalSettings!.map((setting, id) => { - return ( + {normalSettings!.map( + (setting, id) => paramDisplayCondition(setting!) && ( ) - ); - })} + )}