From d9df1f085b4790174524ac7519ed2fbdff301405 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Sat, 10 Dec 2022 19:30:04 +0800 Subject: [PATCH] feat(frontend): support setting query epoch --- e2e_test/batch/basic/query.slt.part | 15 ++++ src/common/src/session_config/mod.rs | 70 ++++++++++++++++++- src/frontend/src/handler/query.rs | 23 ++++-- .../src/scheduler/distributed/query.rs | 11 ++- .../scheduler/distributed/query_manager.rs | 4 +- .../src/scheduler/hummock_snapshot_manager.rs | 26 ++++++- src/frontend/src/scheduler/local.rs | 7 +- 7 files changed, 134 insertions(+), 22 deletions(-) diff --git a/e2e_test/batch/basic/query.slt.part b/e2e_test/batch/basic/query.slt.part index 5bc8365228d08..37e4f1ba07f12 100644 --- a/e2e_test/batch/basic/query.slt.part +++ b/e2e_test/batch/basic/query.slt.part @@ -30,6 +30,21 @@ select count(*) from t3; ---- 1 +statement ok +SET QUERY_EPOCH TO 1; + +query III +select t3.* from t3; +---- + +statement ok +SET QUERY_EPOCH TO 0; + +query III +select t3.* from t3; +---- +1 2 NULL + statement ok drop table t3; diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index a567079a21d8d..9a875d8f55a13 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -24,10 +24,11 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD}; use crate::error::{ErrorCode, RwError}; use crate::session_config::transaction_isolation_level::IsolationLevel; +use crate::util::epoch::Epoch; // This is a hack, &'static str is not allowed as a const generics argument. // TODO: refine this using the adt_const_params feature. -const CONFIG_KEYS: [&str; 10] = [ +const CONFIG_KEYS: [&str; 11] = [ "RW_IMPLICIT_FLUSH", "CREATE_COMPACTION_GROUP_FOR_MV", "QUERY_MODE", @@ -38,6 +39,7 @@ const CONFIG_KEYS: [&str; 10] = [ "MAX_SPLIT_RANGE_GAP", "SEARCH_PATH", "TRANSACTION ISOLATION LEVEL", + "QUERY_EPOCH", ]; // MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] = @@ -52,6 +54,7 @@ const BATCH_ENABLE_LOOKUP_JOIN: usize = 6; const MAX_SPLIT_RANGE_GAP: usize = 7; const SEARCH_PATH: usize = 8; const TRANSACTION_ISOLATION_LEVEL: usize = 9; +const QUERY_EPOCH: usize = 10; trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> { fn entry_name() -> &'static str; @@ -184,6 +187,51 @@ impl TryFrom<&[&str]> for ConfigI32(i64); + +impl Default for ConfigI64 { + fn default() -> Self { + ConfigI64(DEFAULT) + } +} + +impl Deref for ConfigI64 { + type Target = i64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl ConfigEntry for ConfigI64 { + fn entry_name() -> &'static str { + CONFIG_KEYS[NAME] + } +} + +impl TryFrom<&[&str]> for ConfigI64 { + type Error = RwError; + + fn try_from(value: &[&str]) -> Result { + if value.len() != 1 { + return Err(ErrorCode::InternalError(format!( + "SET {} takes only one argument", + Self::entry_name() + )) + .into()); + } + + let s = value[0]; + s.parse::().map(ConfigI64).map_err(|_e| { + ErrorCode::InvalidConfigValue { + config_entry: Self::entry_name().to_string(), + config_value: s.to_string(), + } + .into() + }) + } +} + pub struct VariableInfo { pub name: String, pub setting: String, @@ -198,6 +246,7 @@ type ExtraFloatDigit = ConfigI32; type DateStyle = ConfigString; type BatchEnableLookupJoin = ConfigBool; type MaxSplitRangeGap = ConfigI32; +type QueryEpoch = ConfigI64; #[derive(Default)] pub struct ConfigMap { @@ -234,6 +283,9 @@ pub struct ConfigMap { /// see transaction_isolation_level: IsolationLevel, + + /// select as of specific epoch + query_epoch: QueryEpoch, } impl ConfigMap { @@ -257,6 +309,8 @@ impl ConfigMap { self.max_split_range_gap = val.as_slice().try_into()?; } else if key.eq_ignore_ascii_case(SearchPath::entry_name()) { self.search_path = val.as_slice().try_into()?; + } else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) { + self.query_epoch = val.as_slice().try_into()?; } else { return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()); } @@ -285,6 +339,8 @@ impl ConfigMap { Ok(self.search_path.to_string()) } else if key.eq_ignore_ascii_case(IsolationLevel::entry_name()) { Ok(self.transaction_isolation_level.to_string()) + } else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) { + Ok(self.query_epoch.to_string()) } else { Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into()) } @@ -336,6 +392,11 @@ impl ConfigMap { name: SearchPath::entry_name().to_lowercase(), setting : self.search_path.to_string(), description : String::from("Sets the order in which schemas are searched when an object (table, data type, function, etc.) is referenced by a simple name with no schema specified") + }, + VariableInfo { + name: QueryEpoch::entry_name().to_lowercase(), + setting : self.query_epoch.to_string(), + description : String::from("Sets the historical epoch for querying data. If 0, querying latest data.") } ] } @@ -379,4 +440,11 @@ impl ConfigMap { pub fn get_search_path(&self) -> SearchPath { self.search_path.clone() } + + pub fn get_query_epoch(&self) -> Option { + if self.query_epoch.0 != 0 { + return Some((self.query_epoch.0 as u64).into()); + } + None + } } diff --git a/src/frontend/src/handler/query.rs b/src/frontend/src/handler/query.rs index d3386c1ed760e..e0f1494418215 100644 --- a/src/frontend/src/handler/query.rs +++ b/src/frontend/src/handler/query.rs @@ -20,6 +20,7 @@ use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use postgres_types::FromSql; +use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_common::session_config::QueryMode; @@ -35,7 +36,7 @@ use crate::planner::Planner; use crate::scheduler::plan_fragmenter::Query; use crate::scheduler::{ BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef, - HummockSnapshotGuard, LocalQueryExecution, LocalQueryStream, + LocalQueryExecution, LocalQueryStream, QueryHummockSnapshot, }; use crate::session::SessionImpl; use crate::PlanRef; @@ -133,17 +134,27 @@ pub async fn handle_query( let hummock_snapshot_manager = session.env().hummock_snapshot_manager(); let query_id = query.query_id().clone(); let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?; - + let mut query_snapshot = QueryHummockSnapshot::FrontendPinned(pinned_snapshot); + if let Some(query_epoch) = session.config().get_query_epoch() { + if query_epoch.0 > query_snapshot.get_committed_epoch() { + bail!( + "cannot query with future epoch: {}, current committed epoch: {}", + query_epoch.0, + query_snapshot.get_committed_epoch() + ); + } + query_snapshot = QueryHummockSnapshot::Other(query_epoch); + } match query_mode { QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new( - local_execute(session.clone(), query, pinned_snapshot).await?, + local_execute(session.clone(), query, query_snapshot).await?, column_types, format, )), // Local mode do not support cancel tasks. QueryMode::Distributed => { PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new( - distribute_execute(session.clone(), query, pinned_snapshot).await?, + distribute_execute(session.clone(), query, query_snapshot).await?, column_types, format, )) @@ -224,7 +235,7 @@ fn to_statement_type(stmt: &Statement) -> Result { pub async fn distribute_execute( session: Arc, query: Query, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: QueryHummockSnapshot, ) -> Result { let execution_context: ExecutionContextRef = ExecutionContext::new(session.clone()).into(); let query_manager = session.env().query_manager().clone(); @@ -238,7 +249,7 @@ pub async fn distribute_execute( pub async fn local_execute( session: Arc, query: Query, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: QueryHummockSnapshot, ) -> Result { let front_env = session.env(); diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index a7c52d92db219..4b58592fa8173 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -36,8 +36,7 @@ use crate::scheduler::distributed::StageExecution; use crate::scheduler::plan_fragmenter::{Query, StageId, ROOT_TASK_ID, ROOT_TASK_OUTPUT_ID}; use crate::scheduler::worker_node_manager::WorkerNodeManagerRef; use crate::scheduler::{ - ExecutionContextRef, HummockSnapshotGuard, PinnedHummockSnapshot, SchedulerError, - SchedulerResult, + ExecutionContextRef, QueryHummockSnapshot, SchedulerError, SchedulerResult, }; /// Message sent to a `QueryRunner` to control its execution. @@ -115,7 +114,7 @@ impl QueryExecution { &self, context: ExecutionContextRef, worker_node_manager: WorkerNodeManagerRef, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: QueryHummockSnapshot, compute_client_pool: ComputeClientPoolRef, catalog_reader: CatalogReader, query_execution_info: QueryExecutionInfoRef, @@ -189,7 +188,7 @@ impl QueryExecution { fn gen_stage_executions( &self, - pinned_snapshot: &PinnedHummockSnapshot, + pinned_snapshot: &QueryHummockSnapshot, context: ExecutionContextRef, worker_node_manager: WorkerNodeManagerRef, compute_client_pool: ComputeClientPoolRef, @@ -225,7 +224,7 @@ impl QueryExecution { } impl QueryRunner { - async fn run(mut self, pinned_snapshot: PinnedHummockSnapshot) { + async fn run(mut self, pinned_snapshot: QueryHummockSnapshot) { // Start leaf stages. let leaf_stages = self.query.leaf_stages(); for stage_id in &leaf_stages { @@ -424,7 +423,7 @@ pub(crate) mod tests { .start( ExecutionContext::new(SessionImpl::mock().into()).into(), worker_node_manager, - pinned_snapshot, + pinned_snapshot.into(), compute_client_pool, catalog_reader, query_execution_info, diff --git a/src/frontend/src/scheduler/distributed/query_manager.rs b/src/frontend/src/scheduler/distributed/query_manager.rs index c64280707c63f..2c4c327e4f462 100644 --- a/src/frontend/src/scheduler/distributed/query_manager.rs +++ b/src/frontend/src/scheduler/distributed/query_manager.rs @@ -34,7 +34,7 @@ use crate::catalog::catalog_service::CatalogReader; use crate::scheduler::plan_fragmenter::{Query, QueryId}; use crate::scheduler::worker_node_manager::WorkerNodeManagerRef; use crate::scheduler::{ - ExecutionContextRef, HummockSnapshotGuard, HummockSnapshotManagerRef, SchedulerResult, + ExecutionContextRef, HummockSnapshotManagerRef, QueryHummockSnapshot, SchedulerResult, }; pub struct DistributedQueryStream { @@ -160,7 +160,7 @@ impl QueryManager { &self, context: ExecutionContextRef, query: Query, - pinned_snapshot: HummockSnapshotGuard, + pinned_snapshot: QueryHummockSnapshot, ) -> SchedulerResult { let query_id = query.query_id.clone(); let query_execution = Arc::new(QueryExecution::new(query, context.session().id())); diff --git a/src/frontend/src/scheduler/hummock_snapshot_manager.rs b/src/frontend/src/scheduler/hummock_snapshot_manager.rs index ad0c3cdbf7e3b..88794a626a8d4 100644 --- a/src/frontend/src/scheduler/hummock_snapshot_manager.rs +++ b/src/frontend/src/scheduler/hummock_snapshot_manager.rs @@ -19,7 +19,7 @@ use std::time::{Duration, Instant}; use anyhow::anyhow; use arc_swap::ArcSwap; -use risingwave_common::util::epoch::INVALID_EPOCH; +use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; use risingwave_pb::hummock::HummockSnapshot; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot::{channel as once_channel, Sender as Callback}; @@ -32,7 +32,27 @@ use crate::scheduler::{SchedulerError, SchedulerResult}; const UNPIN_INTERVAL_SECS: u64 = 10; pub type HummockSnapshotManagerRef = Arc; -pub type PinnedHummockSnapshot = HummockSnapshotGuard; +pub enum QueryHummockSnapshot { + FrontendPinned(HummockSnapshotGuard), + /// Other arbitrary epoch, e.g. user specified. + /// Availability and consistency of underlying data should be guaranteed accordingly. + Other(Epoch), +} + +impl QueryHummockSnapshot { + pub fn get_committed_epoch(&self) -> u64 { + match self { + QueryHummockSnapshot::FrontendPinned(s) => s.snapshot.committed_epoch, + QueryHummockSnapshot::Other(s) => s.0, + } + } +} + +impl From for QueryHummockSnapshot { + fn from(s: HummockSnapshotGuard) -> Self { + QueryHummockSnapshot::FrontendPinned(s) + } +} type SnapshotRef = Arc>; @@ -165,7 +185,7 @@ impl HummockSnapshotManager { } } - pub async fn acquire(&self, query_id: &QueryId) -> SchedulerResult { + pub async fn acquire(&self, query_id: &QueryId) -> SchedulerResult { let (sender, rc) = once_channel(); let msg = EpochOperation::RequestEpoch { query_id: query_id.clone(), diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index d0328a4d07e13..bd9912d40acb9 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -39,11 +39,10 @@ use tracing::debug; use uuid::Uuid; use super::plan_fragmenter::{PartitionInfo, QueryStageRef}; -use super::HummockSnapshotGuard; use crate::optimizer::plan_node::PlanNodeType; use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId}; use crate::scheduler::task_context::FrontendBatchTaskContext; -use crate::scheduler::SchedulerResult; +use crate::scheduler::{QueryHummockSnapshot, SchedulerResult}; use crate::session::{AuthContext, FrontendEnv}; pub struct LocalQueryStream { @@ -72,7 +71,7 @@ pub struct LocalQueryExecution { query: Query, front_env: FrontendEnv, // The snapshot will be released when LocalQueryExecution is dropped. - snapshot: HummockSnapshotGuard, + snapshot: QueryHummockSnapshot, auth_context: Arc, } @@ -81,7 +80,7 @@ impl LocalQueryExecution { query: Query, front_env: FrontendEnv, sql: S, - snapshot: HummockSnapshotGuard, + snapshot: QueryHummockSnapshot, auth_context: Arc, ) -> Self { Self {