Skip to content

Commit

Permalink
feat(frontend): support fetch n from subscription cursor (#16764)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored May 30, 2024
1 parent e154a37 commit 4bef086
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 52 deletions.
113 changes: 93 additions & 20 deletions e2e_test/subscription/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ def execute_insert(sql,conn):
conn.commit()
cur.close()

def check_rows_data(expect_vec,rows,status):
row = rows[0]
def check_rows_data(expect_vec,row,status):
value_len = len(row)
for index, value in enumerate(row):
if index == value_len - 1:
Expand All @@ -56,7 +55,7 @@ def test_cursor_snapshot():

execute_insert("declare cur subscription cursor for sub",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([1,2],row,1)
check_rows_data([1,2],row[0],1)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
Expand All @@ -75,17 +74,17 @@ def test_cursor_snapshot_log_store():

execute_insert("declare cur subscription cursor for sub",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([1,2],row,1)
check_rows_data([1,2],row[0],1)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("insert into t1 values(4,4)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(5,5)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row,1)
check_rows_data([4,4],row[0],1)
row = execute_query("fetch next from cur",conn)
check_rows_data([5,5],row,1)
check_rows_data([5,5],row[0],1)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
Expand All @@ -109,11 +108,11 @@ def test_cursor_since_begin():
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row,1)
check_rows_data([4,4],row[0],1)
row = execute_query("fetch next from cur",conn)
check_rows_data([5,5],row,1)
check_rows_data([5,5],row[0],1)
row = execute_query("fetch next from cur",conn)
check_rows_data([6,6],row,1)
check_rows_data([6,6],row[0],1)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
Expand All @@ -138,7 +137,7 @@ def test_cursor_since_now():
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([6,6],row,1)
check_rows_data([6,6],row[0],1)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
Expand All @@ -164,27 +163,27 @@ def test_cursor_since_rw_timestamp():
row = execute_query("fetch next from cur",conn)
valuelen = len(row[0])
rw_timestamp_1 = row[0][valuelen - 1]
check_rows_data([4,4],row,1)
check_rows_data([4,4],row[0],1)
row = execute_query("fetch next from cur",conn)
valuelen = len(row[0])
rw_timestamp_2 = row[0][valuelen - 1] - 1
check_rows_data([5,5],row,1)
check_rows_data([5,5],row[0],1)
row = execute_query("fetch next from cur",conn)
valuelen = len(row[0])
rw_timestamp_3 = row[0][valuelen - 1] + 1
check_rows_data([6,6],row,1)
check_rows_data([6,6],row[0],1)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)

execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_1}",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row,1)
check_rows_data([4,4],row[0],1)
execute_insert("close cur",conn)

execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_2}",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([5,5],row,1)
check_rows_data([5,5],row[0],1)
execute_insert("close cur",conn)

execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_3}",conn)
Expand All @@ -206,7 +205,7 @@ def test_cursor_op():

execute_insert("declare cur subscription cursor for sub",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([1,2],row,1)
check_rows_data([1,2],row[0],1)
row = execute_query("fetch next from cur",conn)
assert row == []

Expand All @@ -215,28 +214,102 @@ def test_cursor_op():
execute_insert("update t1 set v2 = 10 where v1 = 4",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row,1)
check_rows_data([4,4],row[0],1)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row,4)
check_rows_data([4,4],row[0],4)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,10],row,3)
check_rows_data([4,10],row[0],3)
row = execute_query("fetch next from cur",conn)
assert row == []

execute_insert("delete from t1 where v1 = 4",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,10],row,2)
check_rows_data([4,10],row[0],2)
row = execute_query("fetch next from cur",conn)
assert row == []

execute_insert("close cur",conn)
drop_table_subscription()

def test_cursor_with_table_alter():
print(f"test_cursor_with_table_alter")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

execute_insert("declare cur subscription cursor for sub",conn)
execute_insert("alter table t1 add v3 int",conn)
execute_insert("insert into t1 values(4,4,4)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([1,2],row[0],1)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4,4],row[0],1)
execute_insert("insert into t1 values(5,5,5)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([5,5,5],row[0],1)
execute_insert("alter table t1 drop column v2",conn)
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([6,6],row[0],1)
drop_table_subscription()

def test_cursor_fetch_n():
print(f"test_cursor_with_table_alter")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

execute_insert("declare cur subscription cursor for sub",conn)
execute_insert("insert into t1 values(4,4)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(5,5)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(7,7)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(8,8)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(9,9)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(10,10)",conn)
execute_insert("flush",conn)
execute_insert("update t1 set v2 = 100 where v1 = 10",conn)
execute_insert("flush",conn)
row = execute_query("fetch 6 from cur",conn)
assert len(row) == 6
check_rows_data([1,2],row[0],1)
check_rows_data([4,4],row[1],1)
check_rows_data([5,5],row[2],1)
check_rows_data([6,6],row[3],1)
check_rows_data([7,7],row[4],1)
check_rows_data([8,8],row[5],1)
row = execute_query("fetch 6 from cur",conn)
assert len(row) == 4
check_rows_data([9,9],row[0],1)
check_rows_data([10,10],row[1],1)
check_rows_data([10,10],row[2],4)
check_rows_data([10,100],row[3],3)
drop_table_subscription()

if __name__ == "__main__":
test_cursor_snapshot()
test_cursor_op()
test_cursor_snapshot_log_store()
test_cursor_since_rw_timestamp()
test_cursor_since_now()
test_cursor_since_begin()
test_cursor_with_table_alter()
test_cursor_fetch_n()
3 changes: 1 addition & 2 deletions src/frontend/src/handler/declare_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ async fn handle_declare_subscription_cursor(
let cursor_from_subscription_name = sub_name.0.last().unwrap().real_value().clone();
let subscription =
session.get_subscription_by_name(schema_name, &cursor_from_subscription_name)?;
let table = session.get_table_by_id(&subscription.dependent_table_id)?;
// Start the first query of cursor, which includes querying the table and querying the subscription's logstore
let start_rw_timestamp = match rw_timestamp {
Some(risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp)) => {
Expand All @@ -81,8 +80,8 @@ async fn handle_declare_subscription_cursor(
.add_subscription_cursor(
cursor_name.clone(),
start_rw_timestamp,
subscription.dependent_table_id,
subscription,
table,
&handle_args,
)
.await?;
Expand Down
13 changes: 13 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/log_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,19 @@ impl LogScan {
Schema { fields }
}

pub(crate) fn schema_without_table_name(&self) -> Schema {
let mut fields: Vec<_> = self
.output_col_idx
.iter()
.map(|tb_idx| {
let col = &self.table_desc.columns[*tb_idx];
Field::from(col)
})
.collect();
fields.push(Field::with_name(OP_TYPE, OP_NAME));
Schema { fields }
}

pub(crate) fn ctx(&self) -> OptimizerContextRef {
self.ctx.clone()
}
Expand Down
Loading

0 comments on commit 4bef086

Please sign in to comment.