diff --git a/flow/model/model.go b/flow/model/model.go index a01a6b29e0..b008e624f1 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -517,8 +517,6 @@ type SyncRecordsRequest struct { Records *CDCRecordStream // FlowJobName is the name of the flow job. FlowJobName string - // SyncMode to use for pushing raw records - SyncMode protos.QRepSyncMode // source:destination mappings TableMappings []*protos.TableMapping // Staging path for AVRO files in CDC diff --git a/protos/flow.proto b/protos/flow.proto index 3a54ff2727..41ba8bf966 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -245,11 +245,6 @@ message PartitionRange { } // protos for qrep -enum QRepSyncMode { - QREP_SYNC_MODE_MULTI_INSERT = 0; - QREP_SYNC_MODE_STORAGE_AVRO = 1; -} - enum QRepWriteType { QREP_WRITE_MODE_APPEND = 0; QREP_WRITE_MODE_UPSERT = 1; @@ -276,19 +271,13 @@ message QRepConfig { string watermark_column = 7; bool initial_copy_only = 8; - QRepSyncMode sync_mode = 9; - // DEPRECATED: eliminate when breaking changes are allowed. - uint32 batch_size_int = 10; - // DEPRECATED: eliminate when breaking changes are allowed. - uint32 batch_duration_seconds = 11; - - uint32 max_parallel_workers = 12; + uint32 max_parallel_workers = 9; // time to wait between getting partitions to process - uint32 wait_between_batches_seconds = 13; + uint32 wait_between_batches_seconds = 10; - QRepWriteMode write_mode = 14; + QRepWriteMode write_mode = 11; // This is only used when sync_mode is AVRO // this is the location where the avro files will be written @@ -296,22 +285,22 @@ message QRepConfig { // if this starts with s3:// then it will be written to S3, only supported in Snowflake // if nothing is specified then it will be written to local disk // if using GCS or S3 make sure your instance has the correct permissions. - string staging_path = 15; + string staging_path = 12; // This setting overrides batch_size_int and batch_duration_seconds // and instead uses the number of rows per partition to determine // how many rows to process per batch. - uint32 num_rows_per_partition = 16; + uint32 num_rows_per_partition = 13; // Creates the watermark table on the destination as-is, can be used for some queries. - bool setup_watermark_table_on_destination = 17; + bool setup_watermark_table_on_destination = 14; // create new tables with "_peerdb_resync" suffix, perform initial load and then swap the new table with the old ones // to be used after the old mirror is dropped - bool dst_table_full_resync = 18; + bool dst_table_full_resync = 15; - string synced_at_col_name = 19; - string soft_delete_col_name = 20; + string synced_at_col_name = 16; + string soft_delete_col_name = 17; } message QRepPartition { diff --git a/ui/app/mirrors/create/cdc/cdc.tsx b/ui/app/mirrors/create/cdc/cdc.tsx index 9101703c20..db26cefd51 100644 --- a/ui/app/mirrors/create/cdc/cdc.tsx +++ b/ui/app/mirrors/create/cdc/cdc.tsx @@ -1,5 +1,4 @@ 'use client'; -import { QRepSyncMode } from '@/grpc_generated/flow'; import { DBType } from '@/grpc_generated/peers'; import { Button } from '@/lib/Button'; import { Icon } from '@/lib/Icon'; @@ -39,7 +38,7 @@ export default function CDCConfigForm({ }: MirrorConfigProps) { const [show, setShow] = useState(false); const handleChange = (val: string | boolean, setting: MirrorSetting) => { - let stateVal: string | boolean | QRepSyncMode = val; + let stateVal: string | boolean = val; setting.stateHandler(stateVal, setter); }; diff --git a/ui/app/mirrors/create/handlers.ts b/ui/app/mirrors/create/handlers.ts index 00dab7347b..a3fdab3178 100644 --- a/ui/app/mirrors/create/handlers.ts +++ b/ui/app/mirrors/create/handlers.ts @@ -8,7 +8,6 @@ import { import { FlowConnectionConfigs, QRepConfig, - QRepSyncMode, QRepWriteType, } from '@/grpc_generated/flow'; import { DBType, Peer, dBTypeToJSON } from '@/grpc_generated/peers'; @@ -28,28 +27,6 @@ export const handlePeer = ( ) => { if (!peer) return; if (peerEnd === 'dst') { - if (peer.type === DBType.POSTGRES) { - setConfig((curr) => { - return { - ...curr, - cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, - snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, - syncMode: QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT, - }; - }); - } else if ( - peer.type === DBType.SNOWFLAKE || - peer.type === DBType.BIGQUERY - ) { - setConfig((curr) => { - return { - ...curr, - cdcSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, - snapshotSyncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, - syncMode: QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO, - }; - }); - } setConfig((curr) => ({ ...curr, destination: peer, @@ -238,12 +215,6 @@ export const handleCreateQRep = async ( config.flowJobName = flowJobName; config.query = query; - if (config.destinationPeer?.type == DBType.POSTGRES) { - config.syncMode = QRepSyncMode.QREP_SYNC_MODE_MULTI_INSERT; - } else { - config.syncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO; - } - setLoading(true); const statusMessage: UCreateMirrorResponse = await fetch( '/api/mirrors/qrep', diff --git a/ui/app/mirrors/create/helpers/common.ts b/ui/app/mirrors/create/helpers/common.ts index 1af51e8d85..5d1b8d9a93 100644 --- a/ui/app/mirrors/create/helpers/common.ts +++ b/ui/app/mirrors/create/helpers/common.ts @@ -1,14 +1,10 @@ -import { - FlowConnectionConfigs, - QRepSyncMode, - QRepWriteType, -} from '@/grpc_generated/flow'; +import { FlowConnectionConfigs, QRepWriteType } from '@/grpc_generated/flow'; import { Peer } from '@/grpc_generated/peers'; export interface MirrorSetting { label: string; stateHandler: ( - value: string | string[] | Peer | boolean | QRepSyncMode | QRepWriteType, + value: string | string[] | Peer | boolean | QRepWriteType, setter: any ) => void; type?: string; diff --git a/ui/app/mirrors/create/qrep/qrep.tsx b/ui/app/mirrors/create/qrep/qrep.tsx index bfabd6a3ce..06b0379b47 100644 --- a/ui/app/mirrors/create/qrep/qrep.tsx +++ b/ui/app/mirrors/create/qrep/qrep.tsx @@ -1,6 +1,6 @@ 'use client'; import { RequiredIndicator } from '@/components/RequiredIndicator'; -import { QRepConfig, QRepSyncMode, QRepWriteType } from '@/grpc_generated/flow'; +import { QRepConfig, QRepWriteType } from '@/grpc_generated/flow'; import { DBType } from '@/grpc_generated/peers'; import { Label } from '@/lib/Label'; import { RowWithSelect, RowWithSwitch, RowWithTextField } from '@/lib/Layout'; @@ -51,8 +51,7 @@ export default function QRepConfigForm({ const [loading, setLoading] = useState(false); const handleChange = (val: string | boolean, setting: MirrorSetting) => { - let stateVal: string | boolean | QRepSyncMode | QRepWriteType | string[] = - val; + let stateVal: string | boolean | QRepWriteType | string[] = val; if (setting.label.includes('Write Type')) { switch (val) { case 'Upsert': diff --git a/ui/app/mirrors/edit/[mirrorId]/configValues.ts b/ui/app/mirrors/edit/[mirrorId]/configValues.ts index dd5d5bdb97..8d49f3090f 100644 --- a/ui/app/mirrors/edit/[mirrorId]/configValues.ts +++ b/ui/app/mirrors/edit/[mirrorId]/configValues.ts @@ -1,15 +1,5 @@ -import { FlowConnectionConfigs, QRepSyncMode } from '@/grpc_generated/flow'; +import { FlowConnectionConfigs } from '@/grpc_generated/flow'; -const syncModeToLabel = (mode: QRepSyncMode) => { - switch (mode.toString()) { - case 'QREP_SYNC_MODE_STORAGE_AVRO': - return 'AVRO'; - case 'QREP_SYNC_MODE_MULTI_INSERT': - return 'Copy with Binary'; - default: - return 'AVRO'; - } -}; const MirrorValues = (mirrorConfig: FlowConnectionConfigs | undefined) => { return [ {