Skip to content

Commit

Permalink
feat(frontend): support cursor. (#15180)
Browse files Browse the repository at this point in the history
Co-authored-by: William Wen <[email protected]>
Co-authored-by: Patrick Huang <[email protected]>
  • Loading branch information
3 people authored Apr 18, 2024
1 parent a365c03 commit c2827c7
Show file tree
Hide file tree
Showing 40 changed files with 1,506 additions and 339 deletions.
5 changes: 5 additions & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ sqllogictest -p 4566 -d dev './e2e_test/ttl/ttl.slt'
sqllogictest -p 4566 -d dev './e2e_test/database/prepare.slt'
sqllogictest -p 4566 -d test './e2e_test/database/test.slt'

echo "--- e2e, $mode, subscription"
python3 -m pip install --break-system-packages psycopg2-binary
sqllogictest -p 4566 -d dev './e2e_test/subscription/check_sql_statement.slt'
python3 ./e2e_test/subscription/main.py

echo "--- e2e, $mode, Apache Superset"
sqllogictest -p 4566 -d dev './e2e_test/superset/*.slt' --junit "batch-${profile}"

Expand Down
8 changes: 4 additions & 4 deletions e2e_test/batch/transaction/cursor.slt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ DECLARE
test_cursor CURSOR FOR
SELECT * FROM test where a > 2 ORDER BY a;

statement error cursor "test_cursor" already exists
statement error
DECLARE
test_cursor CURSOR FOR
SELECT * FROM test where a > 2;
Expand All @@ -40,7 +40,7 @@ DECLARE
test_cursor CURSOR FOR
SELECT * FROM test_non_existent where a > 2;

statement error cursor "test_cursor_non_existent" does not exist
statement error
FETCH NEXT from test_cursor_non_existent;

query II
Expand All @@ -57,13 +57,13 @@ query II
FETCH NEXT from test_cursor;
----

statement error cursor "test_cursor_non_existent" does not exist
statement error
CLOSE test_cursor_non_existent;

statement ok
CLOSE test_cursor;

statement error cursor "test_cursor" does not exist
statement error
FETCH NEXT from test_cursor;

statement ok
Expand Down
50 changes: 50 additions & 0 deletions e2e_test/subscription/check_sql_statement.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
statement ok
create table t1 (v1 int, v2 int, v3 int);

statement ok
insert into t1 values (1,2), (2,3);

statement ok
create subscription sub from t1 with(retention = '1D');

statement ok
declare cur subscription cursor for sub;

statement ok
declare cur1 subscription cursor for sub since now();

statement ok
declare cur2 subscription cursor for sub since proctime();

statement ok
declare cur3 subscription cursor for sub since begin();

statement error
declare cur4 subscription cursor for sub since 1;

statement error
declare cur5 subscription cursor for sub since asd;

statement error
declare cur6 subscription cursor for sub since 18446744073709551615;

statement error
declare cur subscription cursor for sub;

statement ok
close cur;

statement ok
close cur1;

statement ok
close cur2;

statement ok
close cur3;

statement ok
drop subscription sub;

statement ok
drop table t1;
8 changes: 8 additions & 0 deletions e2e_test/subscription/create_table_and_subscription.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
statement ok
create table t1 (v1 int, v2 int);

statement ok
insert into t1 values (1,2);

statement ok
create subscription sub from t1 with(retention = '1D');
5 changes: 5 additions & 0 deletions e2e_test/subscription/drop_table_and_subscription.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
statement ok
drop subscription sub;

statement ok
drop table t1;
238 changes: 238 additions & 0 deletions e2e_test/subscription/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import subprocess
import psycopg2
import time


def execute_slt(slt):
if slt is None or slt == "":
return
cmd = f"sqllogictest -p 4566 -d dev {slt}"
print(f"Command line is [{cmd}]")
subprocess.run(cmd,
shell=True,
check=True)
time.sleep(3)

def create_table_subscription():
execute_slt("./e2e_test/subscription/create_table_and_subscription.slt")

def drop_table_subscription():
execute_slt("./e2e_test/subscription/drop_table_and_subscription.slt")

def execute_query(sql,conn):
cur = conn.cursor()
cur.execute(sql)
conn.commit()
rows = cur.fetchall()
cur.close()
return rows

def execute_insert(sql,conn):
cur = conn.cursor()
cur.execute(sql)
conn.commit()
cur.close()

def check_rows_data(expect_vec,rows,status):
row = rows[0]
for index, value in enumerate(row):
if index == 0:
continue
if index == 1:
assert value == status,f"expect {value} but got {status}"
continue
assert value == expect_vec[index-2],f"expect {expect_vec[index-2]} but got {value}"

def test_cursor_snapshot():
print(f"test_cursor_snapshot")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

execute_insert("declare cur subscription cursor for sub",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([1,2],row,1)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
drop_table_subscription()


def test_cursor_snapshot_log_store():
print(f"test_cursor_snapshot_log_store")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

execute_insert("declare cur subscription cursor for sub",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([1,2],row,1)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("insert into t1 values(4,4)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(5,5)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row,1)
row = execute_query("fetch next from cur",conn)
check_rows_data([5,5],row,1)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
drop_table_subscription()

def test_cursor_since_begin():
print(f"test_cursor_since_begin")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

execute_insert("insert into t1 values(4,4)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(5,5)",conn)
execute_insert("flush",conn)
execute_insert("declare cur subscription cursor for sub since begin()",conn)
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row,1)
row = execute_query("fetch next from cur",conn)
check_rows_data([5,5],row,1)
row = execute_query("fetch next from cur",conn)
check_rows_data([6,6],row,1)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
drop_table_subscription()

def test_cursor_since_now():
print(f"test_cursor_since_now")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

execute_insert("insert into t1 values(4,4)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(5,5)",conn)
execute_insert("flush",conn)
execute_insert("declare cur subscription cursor for sub since now()",conn)
time.sleep(2)
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([6,6],row,1)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)
drop_table_subscription()

def test_cursor_since_rw_timestamp():
print(f"test_cursor_since_rw_timestamp")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

execute_insert("insert into t1 values(4,4)",conn)
execute_insert("flush",conn)
execute_insert("insert into t1 values(5,5)",conn)
execute_insert("flush",conn)
execute_insert("declare cur subscription cursor for sub since begin()",conn)
execute_insert("insert into t1 values(6,6)",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
rw_timestamp_1 = row[0][0]
check_rows_data([4,4],row,1)
row = execute_query("fetch next from cur",conn)
rw_timestamp_2 = row[0][0] - 1
check_rows_data([5,5],row,1)
row = execute_query("fetch next from cur",conn)
rw_timestamp_3 = row[0][0] + 1
check_rows_data([6,6],row,1)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)

execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_1}",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row,1)
execute_insert("close cur",conn)

execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_2}",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([5,5],row,1)
execute_insert("close cur",conn)

execute_insert(f"declare cur subscription cursor for sub since {rw_timestamp_3}",conn)
row = execute_query("fetch next from cur",conn)
assert row == []
execute_insert("close cur",conn)

drop_table_subscription()

def test_cursor_op():
print(f"test_cursor_op")
create_table_subscription()
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

execute_insert("declare cur subscription cursor for sub",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([1,2],row,1)
row = execute_query("fetch next from cur",conn)
assert row == []

execute_insert("insert into t1 values(4,4)",conn)
execute_insert("flush",conn)
execute_insert("update t1 set v2 = 10 where v1 = 4",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row,1)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,4],row,4)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,10],row,3)
row = execute_query("fetch next from cur",conn)
assert row == []

execute_insert("delete from t1 where v1 = 4",conn)
execute_insert("flush",conn)
row = execute_query("fetch next from cur",conn)
check_rows_data([4,10],row,2)
row = execute_query("fetch next from cur",conn)
assert row == []

execute_insert("close cur",conn)
drop_table_subscription()

if __name__ == "__main__":
test_cursor_snapshot()
test_cursor_op()
test_cursor_snapshot_log_store()
test_cursor_since_rw_timestamp()
test_cursor_since_now()
test_cursor_since_begin()
1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ message Subscription {
optional string created_at_cluster_version = 16;

string subscription_from_name = 17;
optional string subscription_internal_table_name = 18;
}

message Connection {
Expand Down
7 changes: 7 additions & 0 deletions src/common/src/catalog/internal_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ pub fn valid_table_name(table_name: &str) -> bool {
!INTERNAL_TABLE_NAME.is_match(table_name)
}

pub fn is_subscription_internal_table(subscription_name: &str, table_name: &str) -> bool {
let regex =
Regex::new(format!(r"__internal_{}_(\d+)_subscription_(\d+)", subscription_name).as_str())
.unwrap();
regex.is_match(table_name)
}

pub fn get_dist_key_in_pk_indices<I: Eq + Copy + Debug, O: TryFrom<usize>>(
dist_key_indices: &[I],
pk_indices: &[I],
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub fn visit_stream_node_tables_inner<F>(

// Subscription
NodeBody::Subscription(node) => {
// A Subscription should have a state table.
// A Subscription should have a log store
optional!(node.log_store_table, "Subscription")
}

Expand Down
Loading

0 comments on commit c2827c7

Please sign in to comment.