Skip to content

Commit

Permalink
support format and others
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Feb 21, 2024
1 parent dbb6d66 commit 4192f18
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 125 deletions.
5 changes: 3 additions & 2 deletions src/common/src/util/stream_graph_visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use itertools::Itertools;
use risingwave_pb::catalog::{table, Table};
use risingwave_pb::catalog::Table;
use risingwave_pb::stream_plan::stream_fragment_graph::StreamFragment;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{agg_call_state, StreamNode};
Expand Down Expand Up @@ -198,7 +198,8 @@ pub fn visit_stream_node_tables_inner<F>(
NodeBody::Subscription(node) => {
// A Subscription should have a log store, and name == subscription's name.
optional!(node.log_store_table, "Subscription");
node.log_store_table.as_mut().unwrap().name = node.subscription_catalog.clone().unwrap().name;
node.log_store_table.as_mut().unwrap().name =
node.subscription_catalog.clone().unwrap().name;
}

// Now
Expand Down
42 changes: 25 additions & 17 deletions src/frontend/src/handler/declare_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use pgwire::types::Format;
use risingwave_sqlparser::ast::{DeclareCursorStatement, Ident, ObjectName, Statement};

use super::query::handle_query;
use super::util::gen_query_from_table_name;
use super::util::{
convert_epoch_to_logstore_i64, gen_query_from_logstore_ge_rw_timestamp,
gen_query_from_table_name,
};
use super::{HandlerArgs, RwPgResponse};
use crate::error::{ErrorCode, Result};
use crate::session::cursor_manager::Cursor;
Expand All @@ -42,26 +45,31 @@ pub async fn handle_declare_cursor(
let is_snapshot = start_rw_timestamp == 0;
let subscription =
session.get_subscription_by_name(schema_name, &cursor_from_subscription_name)?;
let retention_seconds = subscription.get_retention_seconds()?;
if is_snapshot {
// let retention_seconds = subscription.get_retention_seconds()?;
let (start_rw_timestamp, res) = if is_snapshot {
let subscription_from_table_name = ObjectName(vec![Ident::from(
subscription.subscription_from_name.as_ref(),
)]);
let query_stmt = Statement::Query(Box::new(gen_query_from_table_name(
subscription_from_table_name,
)?));
let res = handle_query(handle_args, query_stmt, formats).await?;
let start_rw_timestamp = res.query_with_snapshot().ok_or_else(|| {
ErrorCode::InternalError("Fetch can't find snapshot epoch".to_string())
})?;
(convert_epoch_to_logstore_i64(start_rw_timestamp), res)
} else {
todo!()
}
let subscription_from_table_name = ObjectName(vec![Ident::from(
subscription.subscription_from_name.as_ref(),
)]);
let query_stmt = Statement::Query(Box::new(gen_query_from_table_name(
subscription_from_table_name,
)?));

let res = handle_query(handle_args, query_stmt, formats).await?;
let start_rw_timestamp:u64 = res.query_with_snapshot().ok_or_else(||{
ErrorCode::InternalError("Fetch can't find snapshot epoch".to_string())
})?;
let start_rw_timestamp = convert_epoch_to_logstore_i64(start_rw_timestamp);
let query_stmt =
gen_query_from_logstore_ge_rw_timestamp(stmt.cursor_from.clone(), start_rw_timestamp)?;
let res = handle_query(handle_args, query_stmt, formats).await?;
(start_rw_timestamp, res)
};
let cursor = Cursor::new(
cursor_name.clone(),
res,
start_rw_timestamp as i64 ^ (1i64 << 63),
start_rw_timestamp,
is_snapshot,
true,
stmt.cursor_from.clone(),
)
Expand Down
59 changes: 34 additions & 25 deletions src/frontend/src/handler/fetch_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::types::{Format, Row};
use risingwave_common::util::epoch::Epoch;
use risingwave_sqlparser::ast::FetchCursorStatement;
use risingwave_sqlparser::parser::Parser;
use risingwave_storage::table::TableIter;

use super::query::handle_query;
use super::util::gen_query_from_logstore_ge_rw_timestamp;
use super::{HandlerArgs, RwPgResponse};
use crate::error::{ErrorCode, Result};
use crate::session::cursor_manager::Cursor;
use crate::session::cursor_manager::{Cursor, CursorRowValue};
use crate::{Binder, PgResponseStream};

pub async fn handle_fetch_cursor(
Expand All @@ -41,45 +39,56 @@ pub async fn handle_fetch_cursor(
.get_row_with_cursor(cursor_name.clone())
.await?
{
crate::session::cursor_manager::CursorRowValue::Row((row, pg_descs)) => {
CursorRowValue::Row((row, pg_descs)) => {
return Ok(build_fetch_cursor_response(vec![row], pg_descs));
}
crate::session::cursor_manager::CursorRowValue::NextQuery(
rw_timestamp,
subscription_name,
) => {
println!("{}",rw_timestamp);
let sql_str = format!(
"SELECT * FROM {} WHERE kv_log_store_epoch >= {} AND kv_log_store_row_op != 6 ORDER BY kv_log_store_epoch",
subscription_name, rw_timestamp
);
let query_stmt = Parser::parse_sql(&sql_str)
.map_err(|err| {
ErrorCode::InternalError(format!("Parse fetch to select error: {}", err))
})?
.pop()
.ok_or_else(|| ErrorCode::InternalError("Can't get fetch statement".to_string()))?;
CursorRowValue::QueryWithNextRwTimestamp(rw_timestamp, subscription_name) => {
let query_stmt =
gen_query_from_logstore_ge_rw_timestamp(subscription_name.clone(), rw_timestamp)?;
let res = handle_query(handle_args, query_stmt, formats).await?;

let cursor = Cursor::new(
cursor_name.clone(),
res,
rw_timestamp - 1,
rw_timestamp,
false,
true,
subscription_name.clone(),
)
.await?;
cursor_manager.update_cursor(cursor)?;
}
}
CursorRowValue::QueryWithStartRwTimestamp(rw_timestamp, subscription_name) => {
let query_stmt = gen_query_from_logstore_ge_rw_timestamp(
subscription_name.clone(),
rw_timestamp + 1,
)?;
let res = handle_query(handle_args, query_stmt, formats).await?;

let cursor = Cursor::new(
cursor_name.clone(),
res,
rw_timestamp,
false,
false,
subscription_name.clone(),
)
.await?;
cursor_manager.update_cursor(cursor)?;
}
};

match cursor_manager.get_row_with_cursor(cursor_name).await? {
crate::session::cursor_manager::CursorRowValue::Row((row, pg_descs)) => {
CursorRowValue::Row((row, pg_descs)) => {
Ok(build_fetch_cursor_response(vec![row], pg_descs))
}
crate::session::cursor_manager::CursorRowValue::NextQuery(_, _) => {
CursorRowValue::QueryWithStartRwTimestamp(_, _) => {
Ok(build_fetch_cursor_response(vec![], vec![]))
}
CursorRowValue::QueryWithNextRwTimestamp(_, _) => Err(ErrorCode::InternalError(
"Fetch cursor, one must get a row or null".to_string(),
)
.into()),
}
}

Expand Down
56 changes: 35 additions & 21 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,26 +340,34 @@ async fn execute(
// Used in counting row count.
let first_field_format = formats.first().copied().unwrap_or(Format::Text);

let (mut row_stream,query_with_snapshot) = match query_mode {
let (mut row_stream, query_with_snapshot) = match query_mode {
QueryMode::Auto => unreachable!(),
QueryMode::Local => {
let (chunk_stream, query_with_snapshot) = local_execute(session.clone(), query, can_timeout_cancel).await?;
(PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
chunk_stream,
column_types,
formats,
session.clone(),
)),query_with_snapshot)
},
let (chunk_stream, query_with_snapshot) =
local_execute(session.clone(), query, can_timeout_cancel).await?;
(
PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
chunk_stream,
column_types,
formats,
session.clone(),
)),
query_with_snapshot,
)
}
// Local mode do not support cancel tasks.
QueryMode::Distributed => {
let (chunk_stream, query_with_snapshot) = distribute_execute(session.clone(), query, can_timeout_cancel).await?;
(PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
chunk_stream,
column_types,
formats,
session.clone(),
)), query_with_snapshot)
let (chunk_stream, query_with_snapshot) =
distribute_execute(session.clone(), query, can_timeout_cancel).await?;
(
PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
chunk_stream,
column_types,
formats,
session.clone(),
)),
query_with_snapshot,
)
}
};

Expand Down Expand Up @@ -459,7 +467,7 @@ async fn distribute_execute(
session: Arc<SessionImpl>,
query: Query,
can_timeout_cancel: bool,
) -> Result<(DistributedQueryStream,u64)> {
) -> Result<(DistributedQueryStream, u64)> {
let timeout = if cfg!(madsim) {
None
} else if can_timeout_cancel {
Expand All @@ -482,7 +490,7 @@ async fn local_execute(
session: Arc<SessionImpl>,
query: Query,
can_timeout_cancel: bool,
) -> Result<(LocalQueryStream,u64)> {
) -> Result<(LocalQueryStream, u64)> {
let timeout = if cfg!(madsim) {
None
} else if can_timeout_cancel {
Expand All @@ -496,8 +504,14 @@ async fn local_execute(
let snapshot = session.pinned_snapshot();

// TODO: Passing sql here
let execution =
LocalQueryExecution::new(query, front_env.clone(), "", snapshot.clone(), session, timeout);
let execution = LocalQueryExecution::new(
query,
front_env.clone(),
"",
snapshot.clone(),
session,
timeout,
);

Ok((execution.stream_rows(),snapshot.epoch().0))
Ok((execution.stream_rows(), snapshot.epoch().0))
}
26 changes: 25 additions & 1 deletion src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_connector::source::KAFKA_CONNECTOR;
use risingwave_sqlparser::ast::{
display_comma_separated, CompatibleSourceSchema, ConnectorSchema, ObjectName, Query, Select,
SelectItem, SetExpr, TableFactor, TableWithJoins,
SelectItem, SetExpr, Statement, TableFactor, TableWithJoins,
};
use risingwave_sqlparser::parser::Parser;

use crate::catalog::IndexCatalog;
use crate::error::{ErrorCode, Result as RwResult};
Expand Down Expand Up @@ -309,6 +310,29 @@ pub fn gen_query_from_table_name(from_name: ObjectName) -> crate::error::Result<
})
}

pub fn gen_query_from_logstore_ge_rw_timestamp(
logstore_name: ObjectName,
rw_timestamp: i64,
) -> crate::error::Result<Statement> {
let sql_str = format!(
"SELECT * FROM {} WHERE kv_log_store_epoch >= {} AND kv_log_store_row_op != 6 AND kv_log_store_row_op != 5 ORDER BY kv_log_store_epoch",
logstore_name, rw_timestamp
);
let query_stmt = Parser::parse_sql(&sql_str)
.map_err(|err| ErrorCode::InternalError(format!("Parse fetch to select error: {}", err)))?
.pop()
.ok_or_else(|| ErrorCode::InternalError("Can't get fetch statement".to_string()))?;
Ok(query_stmt)
}

pub fn convert_epoch_to_logstore_i64(epoch: u64) -> i64 {
epoch as i64 ^ (1i64 << 63)
}

pub fn convert_logstore_i64_to_epoch(logstore_i64: i64) -> u64 {
logstore_i64 as u64 ^ (1u64 << 63)
}

#[cfg(test)]
mod tests {
use bytes::BytesMut;
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/scheduler/distributed/query_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl QueryManager {
&self,
context: ExecutionContextRef,
query: Query,
) -> SchedulerResult<(DistributedQueryStream,u64)> {
) -> SchedulerResult<(DistributedQueryStream, u64)> {
if let Some(query_limit) = self.disrtibuted_query_limit
&& self.query_metrics.running_query_num.get() as u64 == query_limit
{
Expand Down Expand Up @@ -204,7 +204,10 @@ impl QueryManager {
.delete_query(&query_id);
err
})?;
Ok((query_result_fetcher.stream_from_channel(),pinned_snapshot.epoch().0))
Ok((
query_result_fetcher.stream_from_channel(),
pinned_snapshot.epoch().0,
))
}

pub fn cancel_queries_in_session(&self, session_id: SessionId) {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ impl SessionImpl {
notices: Default::default(),
exec_context: Mutex::new(None),
last_idle_instant: Default::default(),
cursor_manager: Arc::new(tokio::sync::Mutex::new(CursorManager::new())),
cursor_manager: Arc::new(tokio::sync::Mutex::new(CursorManager::default())),
}
}

Expand All @@ -625,7 +625,7 @@ impl SessionImpl {
))
.into(),
last_idle_instant: Default::default(),
cursor_manager: Arc::new(tokio::sync::Mutex::new(CursorManager::new())),
cursor_manager: Arc::new(tokio::sync::Mutex::new(CursorManager::default())),
}
}

Expand Down
Loading

0 comments on commit 4192f18

Please sign in to comment.