Skip to content

Commit

Permalink
removing deprecated params from QRep (#1154)
Browse files Browse the repository at this point in the history
### ⚠️ This change can break existing QRep mirrors!

Removes `QRepSyncMode`, `batch_size_int` and `batch_duration_seconds`
  • Loading branch information
heavycrystal authored Jan 25, 2024
1 parent 56457d1 commit 94cb719
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 73 deletions.
2 changes: 0 additions & 2 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,6 @@ type SyncRecordsRequest struct {
Records *CDCRecordStream
// FlowJobName is the name of the flow job.
FlowJobName string
// SyncMode to use for pushing raw records
SyncMode protos.QRepSyncMode
// source:destination mappings
TableMappings []*protos.TableMapping
// Staging path for AVRO files in CDC
Expand Down
29 changes: 9 additions & 20 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,6 @@ message PartitionRange {
}

// protos for qrep
enum QRepSyncMode {
QREP_SYNC_MODE_MULTI_INSERT = 0;
QREP_SYNC_MODE_STORAGE_AVRO = 1;
}

enum QRepWriteType {
QREP_WRITE_MODE_APPEND = 0;
QREP_WRITE_MODE_UPSERT = 1;
Expand All @@ -276,42 +271,36 @@ message QRepConfig {
string watermark_column = 7;

bool initial_copy_only = 8;
QRepSyncMode sync_mode = 9;

// DEPRECATED: eliminate when breaking changes are allowed.
uint32 batch_size_int = 10;
// DEPRECATED: eliminate when breaking changes are allowed.
uint32 batch_duration_seconds = 11;

uint32 max_parallel_workers = 12;
uint32 max_parallel_workers = 9;

// time to wait between getting partitions to process
uint32 wait_between_batches_seconds = 13;
uint32 wait_between_batches_seconds = 10;

QRepWriteMode write_mode = 14;
QRepWriteMode write_mode = 11;

// This is only used when sync_mode is AVRO
// this is the location where the avro files will be written
// if this starts with gs:// then it will be written to GCS
// if this starts with s3:// then it will be written to S3, only supported in Snowflake
// if nothing is specified then it will be written to local disk
// if using GCS or S3 make sure your instance has the correct permissions.
string staging_path = 15;
string staging_path = 12;

// This setting overrides batch_size_int and batch_duration_seconds
// and instead uses the number of rows per partition to determine
// how many rows to process per batch.
uint32 num_rows_per_partition = 16;
uint32 num_rows_per_partition = 13;

// Creates the watermark table on the destination as-is, can be used for some queries.
bool setup_watermark_table_on_destination = 17;
bool setup_watermark_table_on_destination = 14;

// create new tables with "_peerdb_resync" suffix, perform initial load and then swap the new table with the old ones
// to be used after the old mirror is dropped
bool dst_table_full_resync = 18;
bool dst_table_full_resync = 15;

string synced_at_col_name = 19;
string soft_delete_col_name = 20;
string synced_at_col_name = 16;
string soft_delete_col_name = 17;
}

message QRepPartition {
Expand Down
3 changes: 1 addition & 2 deletions ui/app/mirrors/create/cdc/cdc.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
'use client';
import { QRepSyncMode } from '@/grpc_generated/flow';
import { DBType } from '@/grpc_generated/peers';
import { Button } from '@/lib/Button';
import { Icon } from '@/lib/Icon';
Expand Down Expand Up @@ -39,7 +38,7 @@ export default function CDCConfigForm({
}: MirrorConfigProps) {
const [show, setShow] = useState(false);
const handleChange = (val: string | boolean, setting: MirrorSetting) => {
let stateVal: string | boolean | QRepSyncMode = val;
let stateVal: string | boolean = val;
setting.stateHandler(stateVal, setter);
};

Expand Down
29 changes: 0 additions & 29 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
import {
FlowConnectionConfigs,
QRepConfig,
QRepSyncMode,
QRepWriteType,
} from '@/grpc_generated/flow';
import { DBType, Peer, dBTypeToJSON } from '@/grpc_generated/peers';
Expand All @@ -28,28 +27,6 @@ export const handlePeer = (
) => {
if (!peer) return;
if (peerEnd === 'dst') {
if (peer.type === DBType.POSTGRES) {
setConfig((curr) => {
return {
...curr,
cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT,
snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT,
syncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT,
};
});
} else if (
peer.type === DBType.SNOWFLAKE ||
peer.type === DBType.BIGQUERY
) {
setConfig((curr) => {
return {
...curr,
cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO,
snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO,
syncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO,
};
});
}
setConfig((curr) => ({
...curr,
destination: peer,
Expand Down Expand Up @@ -238,12 +215,6 @@ export const handleCreateQRep = async (
config.flowJobName = flowJobName;
config.query = query;

if (config.destinationPeer?.type == DBType.POSTGRES) {
config.syncMode = QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT;
} else {
config.syncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO;
}

setLoading(true);
const statusMessage: UCreateMirrorResponse = await fetch(
'/api/mirrors/qrep',
Expand Down
8 changes: 2 additions & 6 deletions ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import {
FlowConnectionConfigs,
QRepSyncMode,
QRepWriteType,
} from '@/grpc_generated/flow';
import { FlowConnectionConfigs, QRepWriteType } from '@/grpc_generated/flow';
import { Peer } from '@/grpc_generated/peers';

export interface MirrorSetting {
label: string;
stateHandler: (
value: string | string[] | Peer | boolean | QRepSyncMode | QRepWriteType,
value: string | string[] | Peer | boolean | QRepWriteType,
setter: any
) => void;
type?: string;
Expand Down
5 changes: 2 additions & 3 deletions ui/app/mirrors/create/qrep/qrep.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use client';
import { RequiredIndicator } from '@/components/RequiredIndicator';
import { QRepConfig, QRepSyncMode, QRepWriteType } from '@/grpc_generated/flow';
import { QRepConfig, QRepWriteType } from '@/grpc_generated/flow';
import { DBType } from '@/grpc_generated/peers';
import { Label } from '@/lib/Label';
import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout';
Expand Down Expand Up @@ -51,8 +51,7 @@ export default function QRepConfigForm({
const [loading, setLoading] = useState(false);

const handleChange = (val: string | boolean, setting: MirrorSetting) => {
let stateVal: string | boolean | QRepSyncMode | QRepWriteType | string[] =
val;
let stateVal: string | boolean | QRepWriteType | string[] = val;
if (setting.label.includes('Write Type')) {
switch (val) {
case 'Upsert':
Expand Down
12 changes: 1 addition & 11 deletions ui/app/mirrors/edit/[mirrorId]/configValues.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
import { FlowConnectionConfigs, QRepSyncMode } from '@/grpc_generated/flow';
import { FlowConnectionConfigs } from '@/grpc_generated/flow';

const syncModeToLabel = (mode: QRepSyncMode) => {
switch (mode.toString()) {
case 'QREP_SYNC_MODE_STORAGE_AVRO':
return 'AVRO';
case 'QREP_SYNC_MODE_MULTI_INSERT':
return 'Copy with Binary';
default:
return 'AVRO';
}
};
const MirrorValues = (mirrorConfig: FlowConnectionConfigs | undefined) => {
return [
{
Expand Down

0 comments on commit 94cb719

Please sign in to comment.