Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameter for cdc with only snapshot #884

Merged
merged 2 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,153 changes: 582 additions & 571 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ func CDCFlowWorkflowWithConfig(

state.SnapshotComplete = true
state.Progress = append(state.Progress, "executed setup flow and snapshot flow")
// if initial_copy_only is opted for, we end the flow here.
if cfg.InitialCopyOnly {
return nil, nil
}
}

syncFlowOptions := &protos.SyncFlowOptions{
Expand Down
10 changes: 10 additions & 0 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
_ => None,
};

let initial_copy_only = match raw_options.remove("initial_copy_only") {
Some(sqlparser::ast::Value::Boolean(b)) => *b,
_ => false,
};

let flow_job = FlowJob {
name: cdc.mirror_name.to_string().to_lowercase(),
source_peer: cdc.source_peer.to_string().to_lowercase(),
Expand All @@ -327,8 +332,13 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
resync,
soft_delete_col_name,
synced_at_col_name,
initial_copy_only,
};

if initial_copy_only && !do_initial_copy {
anyhow::bail!("initial_copy_only is set to true, but do_initial_copy is set to false");
}

Ok(Some(PeerDDL::CreateMirrorForCDC {
if_not_exists: *if_not_exists,
flow_job,
Expand Down
1 change: 1 addition & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl FlowGrpcClient {
resync: job.resync,
soft_delete_col_name: job.soft_delete_col_name.clone().unwrap_or_default(),
synced_at_col_name: job.synced_at_col_name.clone().unwrap_or_default(),
initial_copy_only: job.initial_copy_only,
..Default::default()
};

Expand Down
1 change: 1 addition & 0 deletions nexus/pt/src/flow_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub struct FlowJob {
pub resync: bool,
pub soft_delete_col_name: Option<String>,
pub synced_at_col_name: Option<String>,
pub initial_copy_only: bool,
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ pub struct FlowConnectionConfigs {
pub soft_delete_col_name: ::prost::alloc::string::String,
#[prost(string, tag="25")]
pub synced_at_col_name: ::prost::alloc::string::String,
#[prost(bool, tag="26")]
pub initial_copy_only: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,9 @@ impl serde::Serialize for FlowConnectionConfigs {
if !self.synced_at_col_name.is_empty() {
len += 1;
}
if self.initial_copy_only {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.FlowConnectionConfigs", len)?;
if let Some(v) = self.source.as_ref() {
struct_ser.serialize_field("source", v)?;
Expand Down Expand Up @@ -1309,6 +1312,9 @@ impl serde::Serialize for FlowConnectionConfigs {
if !self.synced_at_col_name.is_empty() {
struct_ser.serialize_field("syncedAtColName", &self.synced_at_col_name)?;
}
if self.initial_copy_only {
struct_ser.serialize_field("initialCopyOnly", &self.initial_copy_only)?;
}
struct_ser.end()
}
}
Expand Down Expand Up @@ -1366,6 +1372,8 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
"softDeleteColName",
"synced_at_col_name",
"syncedAtColName",
"initial_copy_only",
"initialCopyOnly",
];

#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -1395,6 +1403,7 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
Resync,
SoftDeleteColName,
SyncedAtColName,
InitialCopyOnly,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand Down Expand Up @@ -1442,6 +1451,7 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
"resync" => Ok(GeneratedField::Resync),
"softDeleteColName" | "soft_delete_col_name" => Ok(GeneratedField::SoftDeleteColName),
"syncedAtColName" | "synced_at_col_name" => Ok(GeneratedField::SyncedAtColName),
"initialCopyOnly" | "initial_copy_only" => Ok(GeneratedField::InitialCopyOnly),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand Down Expand Up @@ -1486,6 +1496,7 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
let mut resync__ = None;
let mut soft_delete_col_name__ = None;
let mut synced_at_col_name__ = None;
let mut initial_copy_only__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::Source => {
Expand Down Expand Up @@ -1655,6 +1666,12 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
}
synced_at_col_name__ = Some(map.next_value()?);
}
GeneratedField::InitialCopyOnly => {
if initial_copy_only__.is_some() {
return Err(serde::de::Error::duplicate_field("initialCopyOnly"));
}
initial_copy_only__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand Down Expand Up @@ -1686,6 +1703,7 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
resync: resync__.unwrap_or_default(),
soft_delete_col_name: soft_delete_col_name__.unwrap_or_default(),
synced_at_col_name: synced_at_col_name__.unwrap_or_default(),
initial_copy_only: initial_copy_only__.unwrap_or_default(),
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ message FlowConnectionConfigs {

string soft_delete_col_name = 24;
string synced_at_col_name = 25;

bool initial_copy_only = 26;
}

message RenameTableOption {
Expand Down
9 changes: 9 additions & 0 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ export const handleCreateCDC = async (
config.cdcSyncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO;
config.snapshotSyncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO;
}

if (config.doInitialCopy == false && config.initialCopyOnly == true) {
setMsg({
ok: false,
msg: 'Initial Copy Only cannot be true if Initial Copy is false.',
});
return;
}

setLoading(true);
const statusMessage: UCreateMirrorResponse = await fetch('/api/mirrors/cdc', {
method: 'POST',
Expand Down
11 changes: 11 additions & 0 deletions ui/app/mirrors/create/helpers/cdc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,15 @@ export const cdcSettings: MirrorSetting[] = [
default: 'SQL',
type: 'switch',
},
{
label: 'Initial Copy Only',
stateHandler: (value, setter) =>
setter((curr: CDCConfig) => ({
...curr,
initialCopyOnly: (value as boolean) || false,
})),
tips: 'If set, PeerDB will only perform initial load and will not perform CDC sync.',
type: 'switch',
advanced: true,
},
];
1 change: 1 addition & 0 deletions ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export const blankCDCSetting: FlowConnectionConfigs = {
resync: false,
softDeleteColName: '',
syncedAtColName: '',
initialCopyOnly: false,
};

export const blankQRepSetting = {
Expand Down
17 changes: 17 additions & 0 deletions ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ export interface FlowConnectionConfigs {
resync: boolean;
softDeleteColName: string;
syncedAtColName: string;
initialCopyOnly: boolean;
}

export interface FlowConnectionConfigs_SrcTableIdNameMappingEntry {
Expand Down Expand Up @@ -872,6 +873,7 @@ function createBaseFlowConnectionConfigs(): FlowConnectionConfigs {
resync: false,
softDeleteColName: "",
syncedAtColName: "",
initialCopyOnly: false,
};
}

Expand Down Expand Up @@ -954,6 +956,9 @@ export const FlowConnectionConfigs = {
if (message.syncedAtColName !== "") {
writer.uint32(202).string(message.syncedAtColName);
}
if (message.initialCopyOnly === true) {
writer.uint32(208).bool(message.initialCopyOnly);
}
return writer;
},

Expand Down Expand Up @@ -1145,6 +1150,13 @@ export const FlowConnectionConfigs = {

message.syncedAtColName = reader.string();
continue;
case 26:
if (tag !== 208) {
break;
}

message.initialCopyOnly = reader.bool();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand Down Expand Up @@ -1199,6 +1211,7 @@ export const FlowConnectionConfigs = {
resync: isSet(object.resync) ? Boolean(object.resync) : false,
softDeleteColName: isSet(object.softDeleteColName) ? String(object.softDeleteColName) : "",
syncedAtColName: isSet(object.syncedAtColName) ? String(object.syncedAtColName) : "",
initialCopyOnly: isSet(object.initialCopyOnly) ? Boolean(object.initialCopyOnly) : false,
};
},

Expand Down Expand Up @@ -1291,6 +1304,9 @@ export const FlowConnectionConfigs = {
if (message.syncedAtColName !== "") {
obj.syncedAtColName = message.syncedAtColName;
}
if (message.initialCopyOnly === true) {
obj.initialCopyOnly = message.initialCopyOnly;
}
return obj;
},

Expand Down Expand Up @@ -1346,6 +1362,7 @@ export const FlowConnectionConfigs = {
message.resync = object.resync ?? false;
message.softDeleteColName = object.softDeleteColName ?? "";
message.syncedAtColName = object.syncedAtColName ?? "";
message.initialCopyOnly = object.initialCopyOnly ?? false;
return message;
},
};
Expand Down
Loading