Skip to content

Commit

Permalink
Fix rest of ui error messages
Browse files Browse the repository at this point in the history
Now to actually make it work
  • Loading branch information
serprex committed Jun 17, 2024
1 parent 4685b84 commit 640396c
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 152 deletions.
3 changes: 1 addition & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,7 @@ func PullCdcRecords[Items model.Items](
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

addRecordWithKey := func(key model.TableWithPkey, rec model.Record[Items]) error {
err := cdcRecordsStorage.Set(logger, key, rec)
if err != nil {
if err := cdcRecordsStorage.Set(logger, key, rec); err != nil {
return err
}
if err := records.AddRecord(ctx, rec); err != nil {
Expand Down
11 changes: 5 additions & 6 deletions ui/app/api/peers/route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { getTruePeer } from '@/app/api/peers/getTruePeer';
import {
CatalogPeer,
PeerConfig,
UCreatePeerResponse,
UValidatePeerResponse,
Expand Down Expand Up @@ -151,8 +149,9 @@ export async function POST(request: Request) {
}

// GET all the peers from the database
export async function GET(request: Request) {
const peers = await prisma.peers.findMany();
const truePeers: Peer[] = peers.map((peer: CatalogPeer) => getTruePeer(peer));
return new Response(JSON.stringify(truePeers));
export async function GET(_request: Request) {
const peers = await prisma.peers.findMany({
select: { name: true, type: true },
});
return new Response(JSON.stringify(peers));
}
2 changes: 2 additions & 0 deletions ui/app/dto/PeersDTO.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
BigqueryConfig,
ClickhouseConfig,
DBType,
ElasticsearchConfig,
EventHubConfig,
EventHubGroupConfig,
Expand Down Expand Up @@ -60,6 +61,7 @@ export type CatalogPeer = {
type: number;
options: Buffer;
};
export type PeerRef = { name: string; type: DBType };
export type PeerSetter = React.Dispatch<React.SetStateAction<PeerConfig>>;

export type SlotLagPoint = {
Expand Down
62 changes: 33 additions & 29 deletions ui/app/mirrors/create/cdc/cdc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import TableMapping from './tablemapping';
interface MirrorConfigProps {
settings: MirrorSetting[];
mirrorConfig: CDCConfig;
destinationType: DBType;
sourceType: DBType;
setter: MirrorSetter;
rows: TableMapRow[];
setRows: Dispatch<SetStateAction<TableMapRow[]>>;
Expand All @@ -33,6 +35,8 @@ export const defaultSyncMode = (dtype: DBType | undefined) => {
export default function CDCConfigForm({
settings,
mirrorConfig,
destinationType,
sourceType,
setter,
rows,
setRows,
Expand All @@ -45,22 +49,24 @@ export default function CDCConfigForm({
setting.stateHandler(stateVal, setter);
};

const normalSettings = useMemo(() => {
return settings!.filter(
(setting) =>
!(
(IsQueuePeer(mirrorConfig.destination?.type) &&
setting.advanced === AdvancedSettingType.QUEUE) ||
setting.advanced === AdvancedSettingType.ALL
)
);
}, [settings, mirrorConfig.destination?.type]);
const normalSettings = useMemo(
() =>
settings!.filter(
(setting) =>
!(
(IsQueuePeer(destinationType) &&
setting.advanced === AdvancedSettingType.QUEUE) ||
setting.advanced === AdvancedSettingType.ALL
)
),
[settings, destinationType]
);

const advancedSettings = useMemo(() => {
return settings!
.map((setting) => {
if (
IsQueuePeer(mirrorConfig.destination?.type) &&
IsQueuePeer(destinationType) &&
setting.advanced === AdvancedSettingType.QUEUE
) {
setting.stateHandler(600, setter);
Expand All @@ -71,30 +77,29 @@ export default function CDCConfigForm({
}
})
.filter((setting) => setting !== undefined);
}, [settings, mirrorConfig.destination?.type, setter]);
}, [settings, destinationType, setter]);

const paramDisplayCondition = (setting: MirrorSetting) => {
const label = setting.label.toLowerCase();
const isQueue = IsQueuePeer(mirrorConfig.destination?.type);
const isQueue = IsQueuePeer(destinationType);
if (
(label.includes('snapshot') && mirrorConfig.doInitialSnapshot !== true) ||
(label === 'replication slot name' &&
mirrorConfig.doInitialSnapshot === true) ||
(label.includes('staging path') &&
defaultSyncMode(mirrorConfig.destination?.type) !== 'AVRO') ||
defaultSyncMode(destinationType) !== 'AVRO') ||
(isQueue && label.includes('soft delete')) ||
(mirrorConfig.destination?.type === DBType.EVENTHUBS &&
(destinationType === DBType.EVENTHUBS &&
(label.includes('initial copy') ||
label.includes('initial load') ||
label.includes('snapshot'))) ||
((mirrorConfig.source?.type !== DBType.POSTGRES ||
mirrorConfig.destination?.type !== DBType.POSTGRES) &&
((sourceType !== DBType.POSTGRES ||
destinationType !== DBType.POSTGRES) &&
label.includes('type system')) ||
(mirrorConfig.destination?.type !== DBType.BIGQUERY &&
label.includes('column name')) ||
(destinationType !== DBType.BIGQUERY && label.includes('column name')) ||
(label.includes('soft delete') &&
![DBType.BIGQUERY, DBType.POSTGRES, DBType.SNOWFLAKE].includes(
mirrorConfig.destination?.type ?? DBType.UNRECOGNIZED
destinationType ?? DBType.UNRECOGNIZED
))
) {
return false;
Expand All @@ -104,17 +109,17 @@ export default function CDCConfigForm({

useEffect(() => {
setPubLoading(true);
fetchPublications(mirrorConfig.source?.name || '').then((pubs) => {
fetchPublications(mirrorConfig.source || '').then((pubs) => {
setPublications(pubs);
setPubLoading(false);
});
}, [mirrorConfig.source?.name]);
}, [mirrorConfig.source]);

if (mirrorConfig.source != undefined && mirrorConfig.destination != undefined)
return (
<>
{normalSettings!.map((setting, id) => {
return (
{normalSettings!.map(
(setting, id) =>
paramDisplayCondition(setting!) && (
<CDCField
key={id}
Expand All @@ -128,8 +133,7 @@ export default function CDCConfigForm({
optionsLoading={pubLoading}
/>
)
);
})}
)}
<Button
className='IconButton'
aria-label='collapse'
Expand All @@ -150,7 +154,7 @@ export default function CDCConfigForm({
</Button>

{show &&
advancedSettings!.map((setting, id) => {
advancedSettings!.map((setting) => {
return (
paramDisplayCondition(setting!) && (
<CDCField
Expand All @@ -163,10 +167,10 @@ export default function CDCConfigForm({
})}

<TableMapping
sourcePeerName={mirrorConfig.source?.name}
sourcePeerName={mirrorConfig.source}
rows={rows}
setRows={setRows}
peerType={mirrorConfig.destination?.type}
peerType={destinationType}
omitAdditionalTablesMapping={new Map<string, string[]>()}
/>
</>
Expand Down
51 changes: 16 additions & 35 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import {
QRepConfig,
QRepWriteType,
} from '@/grpc_generated/flow';
import { DBType, Peer, dBTypeToJSON } from '@/grpc_generated/peers';
import { DBType, dBTypeToJSON } from '@/grpc_generated/peers';
import { Dispatch, SetStateAction } from 'react';

import { CDCConfig, TableMapRow } from '../../dto/MirrorsDTO';

import {
cdcSchema,
flowNameSchema,
Expand All @@ -33,31 +35,11 @@ export const IsQueuePeer = (peerType?: DBType): boolean => {
);
};

export const handlePeer = (
peer: Peer | null,
peerEnd: 'src' | 'dst',
setConfig: (value: SetStateAction<CDCConfig | QRepConfig>) => void
) => {
if (!peer) return;
if (peerEnd === 'dst') {
setConfig((curr) => ({
...curr,
destination: peer,
destinationPeer: peer,
}));
} else {
setConfig((curr) => ({
...curr,
source: peer,
sourcePeer: peer,
}));
}
};

const CDCCheck = (
flowJobName: string,
rows: TableMapRow[],
config: CDCConfig
config: CDCConfig,
destinationType: DBType
) => {
const flowNameValid = flowNameSchema.safeParse(flowJobName);
if (!flowNameValid.success) {
Expand All @@ -82,7 +64,7 @@ const CDCCheck = (
config.replicationSlotName = '';
}

if (IsQueuePeer(config.destination?.type)) {
if (IsQueuePeer(destinationType)) {
config.softDelete = false;
}

Expand Down Expand Up @@ -163,10 +145,11 @@ export const handleCreateCDC = async (
flowJobName: string,
rows: TableMapRow[],
config: CDCConfig,
destinationType: DBType,
setLoading: Dispatch<SetStateAction<boolean>>,
route: RouteCallback
) => {
const err = CDCCheck(flowJobName, rows, config);
const err = CDCCheck(flowJobName, rows, config, destinationType);
if (err != '') {
notifyErr(err);
return;
Expand Down Expand Up @@ -202,6 +185,7 @@ export const handleCreateQRep = async (
flowJobName: string,
query: string,
config: QRepConfig,
destinationType: DBType,
setLoading: Dispatch<SetStateAction<boolean>>,
route: RouteCallback,
xmin?: boolean
Expand Down Expand Up @@ -255,14 +239,12 @@ export const handleCreateQRep = async (
config.query = query;

const isSchemaLessPeer =
config.destinationPeer?.type === DBType.BIGQUERY ||
config.destinationPeer?.type === DBType.CLICKHOUSE;
if (config.destinationPeer?.type !== DBType.ELASTICSEARCH) {
destinationType === DBType.BIGQUERY ||
destinationType === DBType.CLICKHOUSE;
if (destinationType !== DBType.ELASTICSEARCH) {
if (isSchemaLessPeer && config.destinationTableIdentifier?.includes('.')) {
notifyErr(
'Destination table should not be schema qualified for ' +
DBTypeToGoodText(config.destinationPeer?.type) +
' targets'
`Destination table should not be schema qualified for ${DBTypeToGoodText(destinationType)} targets`
);
return;
}
Expand All @@ -271,9 +253,7 @@ export const handleCreateQRep = async (
!config.destinationTableIdentifier?.includes('.')
) {
notifyErr(
'Destination table should be schema qualified for ' +
DBTypeToGoodText(config.destinationPeer?.type) +
' targets'
`Destination table should be schema qualified for ${DBTypeToGoodText(destinationType)} targets`
);
return;
}
Expand Down Expand Up @@ -428,10 +408,11 @@ export const handleValidateCDC = async (
flowJobName: string,
rows: TableMapRow[],
config: CDCConfig,
destinationType: DBType,
setLoading: Dispatch<SetStateAction<boolean>>
) => {
setLoading(true);
const err = CDCCheck(flowJobName, rows, config);
const err = CDCCheck(flowJobName, rows, config, destinationType);
if (err != '') {
notifyErr(err);
setLoading(false);
Expand Down
4 changes: 2 additions & 2 deletions ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ export interface MirrorSetting {
}

export const blankCDCSetting: CDCConfig = {
source: undefined,
destination: undefined,
source: '',
destination: '',
flowJobName: '',
tableMappings: [],
maxBatchSize: 1000000,
Expand Down
Loading

0 comments on commit 640396c

Please sign in to comment.