Skip to content

Commit

Permalink
Validate mirror: account for resync more (#2044)
Browse files Browse the repository at this point in the history
Resync currently does:
1. Drop mirror
2. Create mirror - which in turn does Validate mirror

If validate mirror fails here for some reason (an example of which is
fixed in this PR), then the UX is completely messed up because the
mirror is dropped and the user does not have any idea of what went wrong

This PR calls validate mirror explicitly in the Resync endpoint before
dropping the mirror, returning any validation errors. The subsequent
validate call in Create Mirror is put behind a !resync guard so that
this new mirror gets created

Second thing this PR does is not perform destination table emptiness and
engine check for Clickhouse mirror validation in case of a resync,
because those tables will get swapped out with the _resync tables whose
structure and emptiness PeerDB fully controls because of CREATE OR
REPLACE

Thirdly, this PR improves UI/UX of resync in PeerDB UI to now keep the
resync dialog open so that all user-facing messages are visible. Resync
and close buttons are disabled until resync has been kicked off.
Messages changed to be more accurate (Ex: Resyncing... changed to
Preparing resync..)

<img width="1549" alt="Screenshot 2024-09-05 at 4 10 29 AM"
src="https://github.com/user-attachments/assets/826ad26c-7a83-4dd2-b899-a0a58d2c5df5">

<img width="1593" alt="Screenshot 2024-09-05 at 4 33 49 AM"
src="https://github.com/user-attachments/assets/3abbda8d-0503-41f3-ad78-bbb327102460">
  • Loading branch information
Amogh-Bharadwaj authored Sep 5, 2024
1 parent 2a2240b commit e9e012c
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 49 deletions.
25 changes: 19 additions & 6 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,15 @@ func (h *FlowRequestHandler) CreateCDCFlow(
ctx context.Context, req *protos.CreateCDCFlowRequest,
) (*protos.CreateCDCFlowResponse, error) {
cfg := req.ConnectionConfigs
_, validateErr := h.ValidateCDCMirror(ctx, req)
if validateErr != nil {
slog.Error("validate mirror error", slog.Any("error", validateErr))
return nil, fmt.Errorf("invalid mirror: %w", validateErr)

// For resync, we validate the mirror before dropping it and getting to this step.
// There is no point validating again here if it's a resync - the mirror is dropped already
if !cfg.Resync {
_, validateErr := h.ValidateCDCMirror(ctx, req)
if validateErr != nil {
slog.Error("validate mirror error", slog.Any("error", validateErr))
return nil, fmt.Errorf("invalid mirror: %w", validateErr)
}
}

workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New())
Expand Down Expand Up @@ -554,12 +559,20 @@ func (h *FlowRequestHandler) ResyncMirror(
return nil, err
}

config.Resync = true
config.DoInitialSnapshot = true
// validate mirror first because once the mirror is dropped, there's no going back
_, err = h.ValidateCDCMirror(ctx, &protos.CreateCDCFlowRequest{
ConnectionConfigs: config,
})
if err != nil {
return nil, err
}

err = h.shutdownFlow(ctx, req.FlowJobName, req.DropStats)
if err != nil {
return nil, err
}
config.Resync = true
config.DoInitialSnapshot = true

_, err = h.CreateCDCFlow(ctx, &protos.CreateCDCFlowRequest{
ConnectionConfigs: config,
Expand Down
11 changes: 8 additions & 3 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,14 @@ func (c *ClickhouseConnector) CheckDestinationTables(ctx context.Context, req *p
// this is for handling column exclusion, processed schema does that in a step
processedMapping := shared.BuildProcessedSchemaMapping(req.TableMappings, tableNameSchemaMapping, c.logger)
dstTableNames := slices.Collect(maps.Keys(processedMapping))
err := c.checkTablesEmptyAndEngine(ctx, dstTableNames)
if err != nil {
return err

// In the case of resync, we don't need to check the content or structure of the original tables;
// they'll anyways get swapped out with the _resync tables which we CREATE OR REPLACE
if !req.Resync {
err := c.checkTablesEmptyAndEngine(ctx, dstTableNames)
if err != nil {
return err
}
}
// optimization: fetching columns for all tables at once
chTableColumnsMapping, err := c.getTableColumnsMapping(ctx, dstTableNames)
Expand Down
57 changes: 45 additions & 12 deletions ui/components/MirrorActionsDropdown.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import EditButton from '@/components/EditButton';
import { ResyncDialog } from '@/components/ResyncDialog';
import { FlowStatus } from '@/grpc_generated/flow';
import { MirrorStatusResponse } from '@/grpc_generated/route';
import { Select } from '@tremor/react';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
import { useEffect, useState } from 'react';
import { CSSProperties } from 'styled-components';
import PauseOrResumeButton from './PauseOrResumeButton';

type MirrorActionsProps = {
Expand All @@ -15,6 +17,18 @@ type MirrorActionsProps = {
isNotPaused: boolean;
};

const menuStyle: CSSProperties = {
display: 'flex',
flexDirection: 'column',
position: 'absolute',
top: '50px',
right: '0',
backgroundColor: 'white',
border: '1px solid #ccc',
borderRadius: '5px',
boxShadow: '0 0 5px rgba(0,0,0,0.2)',
};

const MirrorActions = ({
mirrorName,
editLink,
Expand All @@ -24,6 +38,9 @@ const MirrorActions = ({
const [mirrorStatus, setMirrorStatus] = useState<FlowStatus>();
const [mounted, setMounted] = useState(false);

const [showOptions, setShowOptions] = useState(false);
const handleButtonClick = () => setShowOptions(!showOptions);

useEffect(() => {
getMirrorState(mirrorName).then((res: MirrorStatusResponse) => {
setMirrorStatus(res.currentFlowState);
Expand All @@ -33,17 +50,33 @@ const MirrorActions = ({

if (mounted)
return (
<div>
<Select placeholder='Actions' value='Actions'>
{mirrorStatus && (
<PauseOrResumeButton
mirrorName={mirrorName}
mirrorStatus={mirrorStatus}
/>
)}
<EditButton toLink={editLink} disabled={isNotPaused} />
{canResync && <ResyncDialog mirrorName={mirrorName} />}
</Select>
<div
style={{
position: 'relative',
display: 'flex',
flexDirection: 'column',
alignItems: 'center',
width: '10rem',
}}
>
<Button
onClick={handleButtonClick}
style={{ backgroundColor: '#F8F8F8' }}
>
Actions <Icon name='arrow_drop_down' />
</Button>
{showOptions && (
<div style={menuStyle}>
{mirrorStatus && (
<PauseOrResumeButton
mirrorName={mirrorName}
mirrorStatus={mirrorStatus}
/>
)}
<EditButton toLink={editLink} disabled={isNotPaused} />
{canResync && <ResyncDialog mirrorName={mirrorName} />}
</div>
)}
</div>
);
return <></>;
Expand Down
71 changes: 43 additions & 28 deletions ui/components/ResyncDialog.tsx
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
'use client';
import {
ResyncMirrorRequest,
ResyncMirrorResponse,
} from '@/grpc_generated/route';
import { ResyncMirrorRequest } from '@/grpc_generated/route';
import { Button } from '@/lib/Button';
import { Dialog, DialogClose } from '@/lib/Dialog';
import { Label } from '@/lib/Label';
Expand All @@ -16,30 +13,38 @@ type ResyncDialogProps = {

export const ResyncDialog = ({ mirrorName }: ResyncDialogProps) => {
const [syncing, setSyncing] = useState(false);
const [msg, setMsg] = useState('');
const [msg, setMsg] = useState<{
msg: string;
color: 'positive' | 'destructive' | 'base';
}>({
msg: '',
color: 'base',
});

const handleResync = async () => {
setSyncing(true);
setMsg('Resyncing...');
const resyncRes: ResyncMirrorResponse = await fetch(
'/api/v1/mirrors/resync',
{
method: 'POST',
body: JSON.stringify({
flowJobName: mirrorName,
dropStats: true,
} as ResyncMirrorRequest),
}
).then((res) => res.json());
setMsg({ msg: 'Preparing resync...', color: 'base' });
const resyncRes = await fetch('/api/v1/mirrors/resync', {
method: 'POST',
body: JSON.stringify({
flowJobName: mirrorName,
dropStats: true,
} as ResyncMirrorRequest),
}).then((res) => res.json());
if (resyncRes.ok !== true) {
setMsg(
`Unable to drop mirror ${mirrorName}. ${resyncRes.errorMessage ?? ''}`
);
return false;
setMsg({
msg: `Unable to resync mirror ${mirrorName}. ${resyncRes.message ?? ''}`,
color: 'destructive',
});
setSyncing(false);
return;
}
setMsg('Mirror resynced successfully');
window.location.reload();
return true;
setMsg({
msg: 'Resync has been initiated. You may reload this window to see the progress.',
color: 'positive',
});
setSyncing(false);
return;
};

return (
Expand All @@ -63,21 +68,31 @@ export const ResyncDialog = ({ mirrorName }: ResyncDialogProps) => {
<Label as='label' variant='body' style={{ marginTop: '0.3rem' }}>
Are you sure you want to resync this mirror?
<br></br>
This involves <b>dropping the existing mirror</b> and recreating it.
This involves <b>dropping the existing mirror</b> and recreating it
with initial load.
</Label>
<div style={{ display: 'flex', alignItems: 'center' }}>
{syncing && <DotLoader size={15} />}
<Label as='label' style={{ marginTop: '0.3rem' }}>
{msg}
<Label
as='label'
style={{ marginTop: '0.3rem' }}
colorName='lowContrast'
colorSet={msg.color}
>
{msg.msg}
</Label>
</div>
<div style={{ display: 'flex', marginTop: '1rem' }}>
<DialogClose>
<Button style={{ backgroundColor: '#6c757d', color: 'white' }}>
Cancel
<Button
disabled={syncing}
style={{ backgroundColor: '#6c757d', color: 'white' }}
>
{msg.color === 'positive' ? 'Close' : 'Cancel'}
</Button>
</DialogClose>
<Button
disabled={syncing}
onClick={handleResync}
variant='normalSolid'
style={{
Expand Down

0 comments on commit e9e012c

Please sign in to comment.