Skip to content

Commit

Permalink
feat(batch): initial cursor implementation (#15968)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Apr 2, 2024
1 parent 5897c22 commit f975030
Show file tree
Hide file tree
Showing 15 changed files with 566 additions and 15 deletions.
95 changes: 95 additions & 0 deletions e2e_test/batch/transaction/cursor.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table a(aid int, c1 int);

statement ok
insert into a values(1, 11), (2, 12), (3, 13);

statement ok
create table test(a int,b varchar);

statement ok
insert into test values(1, 'hello'), (2, 'world'), (3, 'risingwave'), (4, 'labs');

# Currently, we allow declaring cursors out of TRANSACTION, because we don't have RW txn support. In PG, it's not allowed
statement ok
DECLARE
test_cursor CURSOR FOR
SELECT * FROM test where a > 2 ORDER BY a;

statement ok
CLOSE test_cursor;

statement ok
START TRANSACTION ISOLATION LEVEL REPEATABLE READ;

statement ok
DECLARE
test_cursor CURSOR FOR
SELECT * FROM test where a > 2 ORDER BY a;

statement error cursor "test_cursor" already exists
DECLARE
test_cursor CURSOR FOR
SELECT * FROM test where a > 2;

statement error table or source not found: test
DECLARE
test_cursor CURSOR FOR
SELECT * FROM test_non_existent where a > 2;

statement error cursor "test_cursor_non_existent" does not exist
FETCH NEXT from test_cursor_non_existent;

query II
FETCH NEXT from test_cursor;
----
3 risingwave

query II
FETCH NEXT from test_cursor;
----
4 labs

query II
FETCH NEXT from test_cursor;
----

statement error cursor "test_cursor_non_existent" does not exist
CLOSE test_cursor_non_existent;

statement ok
CLOSE test_cursor;

statement error cursor "test_cursor" does not exist
FETCH NEXT from test_cursor;

statement ok
DECLARE
test_cursor CURSOR FOR
SELECT * FROM test JOIN a ON test.a > 1 and a.aid = test.a ORDER BY test.a;

query IIII
FETCH NEXT from test_cursor;
----
2 world 2 12

query IIII
FETCH NEXT from test_cursor;
----
3 risingwave 3 13

query IIII
FETCH NEXT from test_cursor;
----

statement ok
COMMIT;

statement ok
drop table test;

statement ok
drop table a;
87 changes: 87 additions & 0 deletions e2e_test/batch/transaction/cursor_multi_conn.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table test(a int,b varchar);

statement ok
insert into test values(1, 'hello'), (2, 'world'), (3, 'risingwave');

statement ok
START TRANSACTION ISOLATION LEVEL REPEATABLE READ;

statement ok
DECLARE
test_cursor CURSOR FOR
SELECT * FROM test where a > 2 ORDER BY a;

query II
FETCH NEXT from test_cursor;
----
3 risingwave

connection other
statement ok
flush;

connection other
statement ok
insert into test values(4, 'labs');

connection other
statement ok
flush;

connection other
query II
SELECT * FROM test where a > 2 ORDER BY a;
----
3 risingwave
4 labs

connection other
statement ok
START TRANSACTION ISOLATION LEVEL REPEATABLE READ;

connection other
statement ok
DECLARE
test_cursor CURSOR FOR
SELECT * FROM test where a > 2 ORDER BY a;

connection other
query II
FETCH NEXT from test_cursor;
----
3 risingwave

connection other
query II
FETCH NEXT from test_cursor;
----
4 labs

connection other
query II
FETCH NEXT from test_cursor;
----

connection other
statement ok
COMMIT;

query II
FETCH NEXT from test_cursor;
----

statement ok
COMMIT;

query II
SELECT * FROM test where a > 2 ORDER BY a;
----
3 risingwave
4 labs

statement ok
drop table test;
14 changes: 12 additions & 2 deletions src/frontend/planner_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use anyhow::{anyhow, bail, Result};
pub use resolve_id::*;
use risingwave_frontend::handler::util::SourceSchemaCompatExt;
use risingwave_frontend::handler::{
create_index, create_mv, create_schema, create_source, create_table, create_view, drop_table,
explain, variable, HandlerArgs,
close_cursor, create_index, create_mv, create_schema, create_source, create_table, create_view,
declare_cursor, drop_table, explain, fetch_cursor, variable, HandlerArgs,
};
use risingwave_frontend::session::SessionImpl;
use risingwave_frontend::test_utils::{create_proto_file, get_explain_output, LocalFrontend};
Expand Down Expand Up @@ -570,6 +570,16 @@ impl TestCase {
create_schema::handle_create_schema(handler_args, schema_name, if_not_exists)
.await?;
}
Statement::DeclareCursor { cursor_name, query } => {
declare_cursor::handle_declare_cursor(handler_args, cursor_name, *query)
.await?;
}
Statement::FetchCursor { cursor_name, count } => {
fetch_cursor::handle_fetch_cursor(handler_args, cursor_name, count).await?;
}
Statement::CloseCursor { cursor_name } => {
close_cursor::handle_close_cursor(handler_args, cursor_name).await?;
}
_ => return Err(anyhow!("Unsupported statement type")),
}
}
Expand Down
32 changes: 32 additions & 0 deletions src/frontend/src/handler/close_cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_sqlparser::ast::ObjectName;

use super::RwPgResponse;
use crate::error::Result;
use crate::handler::HandlerArgs;

pub async fn handle_close_cursor(
handler_args: HandlerArgs,
cursor_name: Option<ObjectName>,
) -> Result<RwPgResponse> {
if let Some(name) = cursor_name {
handler_args.session.drop_cursor(name).await?;
} else {
handler_args.session.drop_all_cursors().await;
}
Ok(PgResponse::empty_result(StatementType::CLOSE_CURSOR))
}
49 changes: 49 additions & 0 deletions src/frontend/src/handler/declare_cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_sqlparser::ast::{ObjectName, Query, Statement};

use super::query::{gen_batch_plan_by_statement, gen_batch_plan_fragmenter};
use super::RwPgResponse;
use crate::error::Result;
use crate::handler::HandlerArgs;
use crate::session::cursor::Cursor;
use crate::OptimizerContext;

pub async fn handle_declare_cursor(
handler_args: HandlerArgs,
cursor_name: ObjectName,
query: Query,
) -> Result<RwPgResponse> {
let session = handler_args.session.clone();

let plan_fragmenter_result = {
let context = OptimizerContext::from_handler_args(handler_args);
let plan_result = gen_batch_plan_by_statement(
&session,
context.into(),
Statement::Query(Box::new(query.clone())),
)?;
gen_batch_plan_fragmenter(&session, plan_result)?
};

session
.add_cursor(
cursor_name,
Cursor::new(plan_fragmenter_result, session.clone()).await?,
)
.await?;
Ok(PgResponse::empty_result(StatementType::DECLARE_CURSOR))
}
32 changes: 32 additions & 0 deletions src/frontend/src/handler/fetch_cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_sqlparser::ast::ObjectName;

use super::RwPgResponse;
use crate::error::Result;
use crate::handler::HandlerArgs;

pub async fn handle_fetch_cursor(
handler_args: HandlerArgs,
cursor_name: ObjectName,
count: Option<i32>,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let (rows, pg_descs) = session.cursor_next(&cursor_name, count).await?;
Ok(PgResponse::builder(StatementType::FETCH_CURSOR)
.values(rows.into(), pg_descs)
.into())
}
17 changes: 15 additions & 2 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_sqlparser::ast::*;

use self::util::{DataChunkToRowSetAdapter, SourceSchemaCompatExt};
use self::variable::handle_set_time_zone;
use crate::catalog::table_catalog::TableType;
use crate::error::{ErrorCode, Result};
use crate::handler::cancel_job::handle_cancel;
Expand All @@ -50,6 +49,7 @@ mod alter_table_column;
mod alter_table_with_sr;
pub mod alter_user;
pub mod cancel_job;
pub mod close_cursor;
mod comment;
pub mod create_connection;
mod create_database;
Expand All @@ -65,6 +65,7 @@ pub mod create_table;
pub mod create_table_as;
pub mod create_user;
pub mod create_view;
pub mod declare_cursor;
pub mod describe;
mod drop_connection;
mod drop_database;
Expand All @@ -80,6 +81,7 @@ pub mod drop_user;
mod drop_view;
pub mod explain;
pub mod extended_handle;
pub mod fetch_cursor;
mod flush;
pub mod handle_privilege;
mod kill_process;
Expand Down Expand Up @@ -509,7 +511,9 @@ pub async fn handle(
variable,
value,
} => variable::handle_set(handler_args, variable, value),
Statement::SetTimeZone { local: _, value } => handle_set_time_zone(handler_args, value),
Statement::SetTimeZone { local: _, value } => {
variable::handle_set_time_zone(handler_args, value)
}
Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await,
Statement::CreateIndex {
name,
Expand Down Expand Up @@ -920,6 +924,15 @@ pub async fn handle(
object_name,
comment,
} => comment::handle_comment(handler_args, object_type, object_name, comment).await,
Statement::DeclareCursor { cursor_name, query } => {
declare_cursor::handle_declare_cursor(handler_args, cursor_name, *query).await
}
Statement::FetchCursor { cursor_name, count } => {
fetch_cursor::handle_fetch_cursor(handler_args, cursor_name, count).await
}
Statement::CloseCursor { cursor_name } => {
close_cursor::handle_close_cursor(handler_args, cursor_name).await
}
_ => bail_not_implemented!("Unhandled statement: {}", stmt),
}
}
Loading

0 comments on commit f975030

Please sign in to comment.