Skip to content

Commit

Permalink
remove psql table entries in snapshot mode (#7645)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 18efd754dcb81f002090b128e4bb9a064cbb15af
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Nov 13, 2024
1 parent e7cee98 commit 584d53a
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Changed
- `pw.Table.diff` now supports setting `instance` parameter that allows computing differences for multiple groups.
- `pw.io.postgres.write_snapshot` now keeps the Postgres table fully in sync with the current state of the table in Pathway. This means that if an entry is deleted in Pathway, the same entry will also be deleted from the Postgres table managed by the output connector.

## [0.15.3] - 2024-11-07

Expand Down
113 changes: 69 additions & 44 deletions src/connectors/data_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1681,53 +1681,78 @@ impl Formatter for PsqlSnapshotFormatter {

let mut result = Vec::new();

let update_condition = self
.key_field_positions
.iter()
.map(|position| {
format!(
"{}.{}=${}",
self.table_name,
self.value_field_names[*position],
*position + 1
)
})
.join(" AND ");
if diff == 1 {
let update_condition = self
.key_field_positions
.iter()
.map(|position| {
format!(
"{}.{}=${}",
self.table_name,
self.value_field_names[*position],
*position + 1
)
})
.join(" AND ");

let update_pairs = self
.value_field_positions
.iter()
.map(|position| format!("{}=${}", self.value_field_names[*position], *position + 1))
.join(",");
let update_pairs = self
.value_field_positions
.iter()
.map(|position| format!("{}=${}", self.value_field_names[*position], *position + 1))
.join(",");
writeln!(
result,
"INSERT INTO {} ({},time,diff) VALUES ({},{},{}) ON CONFLICT ({}) DO UPDATE SET {},time={},diff={} WHERE {}",
self.table_name, // INSERT INTO ...
self.value_field_names.iter().format(","), // (...
(1..=values.len()).format_with(",", |x, f| f(&format_args!("${x}"))), // VALUES(...
time, // VALUES(..., time
diff, // VALUES(..., time, diff
self.key_field_names.iter().join(","), // ON CONFLICT(...
update_pairs, // DO UPDATE SET ...
time,
diff,
update_condition, // WHERE ...
)
.unwrap();

writeln!(
result,
"INSERT INTO {} ({},time,diff) VALUES ({},{},{}) ON CONFLICT ({}) DO UPDATE SET {},time={},diff={} WHERE {} AND ({}.time<{} OR ({}.time={} AND {}.diff=-1))",
self.table_name, // INSERT INTO ...
self.value_field_names.iter().format(","), // (...
(1..=values.len()).format_with(",", |x, f| f(&format_args!("${x}"))), // VALUES(...
time, // VALUES(..., time
diff, // VALUES(..., time, diff
self.key_field_names.iter().join(","), // ON CONFLICT(...
update_pairs, // DO UPDATE SET ...
time,
diff,
update_condition, // WHERE ...
self.table_name, // AND ...time
time, // .time < ...
self.table_name, // OR (...time
time, // .time=...
self.table_name, // AND ...diff=-1))
)
.unwrap();
Ok(FormatterContext::new_single_payload(
result,
*key,
values.to_vec(),
time,
diff,
))
} else {
let mut tokens = Vec::new();
let mut key_part_values = Vec::new();
for (name, position) in self
.key_field_names
.iter()
.zip(self.key_field_positions.iter())
{
key_part_values.push(values[*position].clone());
tokens.push(format!(
"{name}={}",
format_args!("${}", key_part_values.len())
));
}
writeln!(
result,
"DELETE FROM {} WHERE {}",
self.table_name,
tokens.join(" AND "),
)
.unwrap();

Ok(FormatterContext::new_single_payload(
result,
*key,
values.to_vec(),
time,
diff,
))
Ok(FormatterContext::new_single_payload(
result,
*key,
take(&mut key_part_values),
time,
diff,
))
}
}
}

Expand Down
31 changes: 25 additions & 6 deletions tests/integration/test_psql_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use pathway_engine::engine::{Key, Timestamp, Value};
use super::helpers::assert_document_raw_byte_contents;

#[test]
fn test_psql_format_snapshot_command() -> eyre::Result<()> {
let formatter = PsqlSnapshotFormatter::new(
fn test_psql_format_snapshot_commands() -> eyre::Result<()> {
let mut formatter = PsqlSnapshotFormatter::new(
"table_name".to_string(),
vec!["key".to_string()],
vec![
Expand All @@ -20,9 +20,28 @@ fn test_psql_format_snapshot_command() -> eyre::Result<()> {
"value_bool".to_string(),
"value_float".to_string(),
],
)?;

let result = formatter.format(
&Key::for_value(&Value::from("1")),
&[
Value::from("k"),
Value::from("string"),
Value::Bool(true),
Value::from(1.23),
],
Timestamp(5),
1,
)?;

assert_eq!(result.payloads.len(), 1);
assert_document_raw_byte_contents(
&result.payloads[0],
b"INSERT INTO table_name (key,value_string,value_bool,value_float,time,diff) VALUES ($1,$2,$3,$4,5,1) ON CONFLICT (key) DO UPDATE SET value_string=$2,value_bool=$3,value_float=$4,time=5,diff=1 WHERE table_name.key=$1\n"
);
assert_eq!(result.values.len(), 4);

let result = formatter?.format(
let result = formatter.format(
&Key::for_value(&Value::from("1")),
&[
Value::from("k"),
Expand All @@ -37,9 +56,9 @@ fn test_psql_format_snapshot_command() -> eyre::Result<()> {
assert_eq!(result.payloads.len(), 1);
assert_document_raw_byte_contents(
&result.payloads[0],
b"INSERT INTO table_name (key,value_string,value_bool,value_float,time,diff) VALUES ($1,$2,$3,$4,5,-1) ON CONFLICT (key) DO UPDATE SET value_string=$2,value_bool=$3,value_float=$4,time=5,diff=-1 WHERE table_name.key=$1 AND (table_name.time<5 OR (table_name.time=5 AND table_name.diff=-1))\n"
b"DELETE FROM table_name WHERE key=$1\n",
);
assert_eq!(result.values.len(), 4);
assert_eq!(result.values.len(), 1);

Ok(())
}
Expand Down Expand Up @@ -87,7 +106,7 @@ fn test_psql_format_snapshot_composite() -> eyre::Result<()> {
assert_eq!(result.payloads.len(), 1);
assert_document_raw_byte_contents(
&result.payloads[0],
b"INSERT INTO table_name (key,value_string,value_bool,value_float,time,diff) VALUES ($1,$2,$3,$4,5,1) ON CONFLICT (key,value_float) DO UPDATE SET value_string=$2,value_bool=$3,time=5,diff=1 WHERE table_name.key=$1 AND table_name.value_float=$4 AND (table_name.time<5 OR (table_name.time=5 AND table_name.diff=-1))\n"
b"INSERT INTO table_name (key,value_string,value_bool,value_float,time,diff) VALUES ($1,$2,$3,$4,5,1) ON CONFLICT (key,value_float) DO UPDATE SET value_string=$2,value_bool=$3,time=5,diff=1 WHERE table_name.key=$1 AND table_name.value_float=$4\n"
);
assert_eq!(result.values.len(), 4);

Expand Down

0 comments on commit 584d53a

Please sign in to comment.