Skip to content

Commit

Permalink
feat(frontend): support setting query epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Dec 10, 2022
1 parent c5c7d1e commit d9df1f0
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 22 deletions.
15 changes: 15 additions & 0 deletions e2e_test/batch/basic/query.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ select count(*) from t3;
----
1

statement ok
SET QUERY_EPOCH TO 1;

query III
select t3.* from t3;
----

statement ok
SET QUERY_EPOCH TO 0;

query III
select t3.* from t3;
----
1 2 NULL

statement ok
drop table t3;

Expand Down
70 changes: 69 additions & 1 deletion src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD};

use crate::error::{ErrorCode, RwError};
use crate::session_config::transaction_isolation_level::IsolationLevel;
use crate::util::epoch::Epoch;

// This is a hack, &'static str is not allowed as a const generics argument.
// TODO: refine this using the adt_const_params feature.
const CONFIG_KEYS: [&str; 10] = [
const CONFIG_KEYS: [&str; 11] = [
"RW_IMPLICIT_FLUSH",
"CREATE_COMPACTION_GROUP_FOR_MV",
"QUERY_MODE",
Expand All @@ -38,6 +39,7 @@ const CONFIG_KEYS: [&str; 10] = [
"MAX_SPLIT_RANGE_GAP",
"SEARCH_PATH",
"TRANSACTION ISOLATION LEVEL",
"QUERY_EPOCH",
];

// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] =
Expand All @@ -52,6 +54,7 @@ const BATCH_ENABLE_LOOKUP_JOIN: usize = 6;
const MAX_SPLIT_RANGE_GAP: usize = 7;
const SEARCH_PATH: usize = 8;
const TRANSACTION_ISOLATION_LEVEL: usize = 9;
const QUERY_EPOCH: usize = 10;

trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> {
fn entry_name() -> &'static str;
Expand Down Expand Up @@ -184,6 +187,51 @@ impl<const NAME: usize, const DEFAULT: i32> TryFrom<&[&str]> for ConfigI32<NAME,
}
}

struct ConfigI64<const NAME: usize, const DEFAULT: i64 = 0>(i64);

impl<const NAME: usize, const DEFAULT: i64> Default for ConfigI64<NAME, DEFAULT> {
fn default() -> Self {
ConfigI64(DEFAULT)
}
}

impl<const NAME: usize, const DEFAULT: i64> Deref for ConfigI64<NAME, DEFAULT> {
type Target = i64;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<const NAME: usize, const DEFAULT: i64> ConfigEntry for ConfigI64<NAME, DEFAULT> {
fn entry_name() -> &'static str {
CONFIG_KEYS[NAME]
}
}

impl<const NAME: usize, const DEFAULT: i64> TryFrom<&[&str]> for ConfigI64<NAME, DEFAULT> {
type Error = RwError;

fn try_from(value: &[&str]) -> Result<Self, Self::Error> {
if value.len() != 1 {
return Err(ErrorCode::InternalError(format!(
"SET {} takes only one argument",
Self::entry_name()
))
.into());
}

let s = value[0];
s.parse::<i64>().map(ConfigI64).map_err(|_e| {
ErrorCode::InvalidConfigValue {
config_entry: Self::entry_name().to_string(),
config_value: s.to_string(),
}
.into()
})
}
}

pub struct VariableInfo {
pub name: String,
pub setting: String,
Expand All @@ -198,6 +246,7 @@ type ExtraFloatDigit = ConfigI32<EXTRA_FLOAT_DIGITS, 1>;
type DateStyle = ConfigString<DATE_STYLE>;
type BatchEnableLookupJoin = ConfigBool<BATCH_ENABLE_LOOKUP_JOIN, false>;
type MaxSplitRangeGap = ConfigI32<MAX_SPLIT_RANGE_GAP, 8>;
type QueryEpoch = ConfigI64<QUERY_EPOCH, 0>;

#[derive(Default)]
pub struct ConfigMap {
Expand Down Expand Up @@ -234,6 +283,9 @@ pub struct ConfigMap {

/// see <https://www.postgresql.org/docs/current/transaction-iso.html>
transaction_isolation_level: IsolationLevel,

/// select as of specific epoch
query_epoch: QueryEpoch,
}

impl ConfigMap {
Expand All @@ -257,6 +309,8 @@ impl ConfigMap {
self.max_split_range_gap = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(SearchPath::entry_name()) {
self.search_path = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) {
self.query_epoch = val.as_slice().try_into()?;
} else {
return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into());
}
Expand Down Expand Up @@ -285,6 +339,8 @@ impl ConfigMap {
Ok(self.search_path.to_string())
} else if key.eq_ignore_ascii_case(IsolationLevel::entry_name()) {
Ok(self.transaction_isolation_level.to_string())
} else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) {
Ok(self.query_epoch.to_string())
} else {
Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into())
}
Expand Down Expand Up @@ -336,6 +392,11 @@ impl ConfigMap {
name: SearchPath::entry_name().to_lowercase(),
setting : self.search_path.to_string(),
description : String::from("Sets the order in which schemas are searched when an object (table, data type, function, etc.) is referenced by a simple name with no schema specified")
},
VariableInfo {
name: QueryEpoch::entry_name().to_lowercase(),
setting : self.query_epoch.to_string(),
description : String::from("Sets the historical epoch for querying data. If 0, querying latest data.")
}
]
}
Expand Down Expand Up @@ -379,4 +440,11 @@ impl ConfigMap {
pub fn get_search_path(&self) -> SearchPath {
self.search_path.clone()
}

pub fn get_query_epoch(&self) -> Option<Epoch> {
if self.query_epoch.0 != 0 {
return Some((self.query_epoch.0 as u64).into());
}
None
}
}
23 changes: 17 additions & 6 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use itertools::Itertools;
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use postgres_types::FromSql;
use risingwave_common::bail;
use risingwave_common::catalog::Schema;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::session_config::QueryMode;
Expand All @@ -35,7 +36,7 @@ use crate::planner::Planner;
use crate::scheduler::plan_fragmenter::Query;
use crate::scheduler::{
BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
HummockSnapshotGuard, LocalQueryExecution, LocalQueryStream,
LocalQueryExecution, LocalQueryStream, QueryHummockSnapshot,
};
use crate::session::SessionImpl;
use crate::PlanRef;
Expand Down Expand Up @@ -133,17 +134,27 @@ pub async fn handle_query(
let hummock_snapshot_manager = session.env().hummock_snapshot_manager();
let query_id = query.query_id().clone();
let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?;

let mut query_snapshot = QueryHummockSnapshot::FrontendPinned(pinned_snapshot);
if let Some(query_epoch) = session.config().get_query_epoch() {
if query_epoch.0 > query_snapshot.get_committed_epoch() {
bail!(
"cannot query with future epoch: {}, current committed epoch: {}",
query_epoch.0,
query_snapshot.get_committed_epoch()
);
}
query_snapshot = QueryHummockSnapshot::Other(query_epoch);
}
match query_mode {
QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
local_execute(session.clone(), query, pinned_snapshot).await?,
local_execute(session.clone(), query, query_snapshot).await?,
column_types,
format,
)),
// Local mode do not support cancel tasks.
QueryMode::Distributed => {
PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
distribute_execute(session.clone(), query, pinned_snapshot).await?,
distribute_execute(session.clone(), query, query_snapshot).await?,
column_types,
format,
))
Expand Down Expand Up @@ -224,7 +235,7 @@ fn to_statement_type(stmt: &Statement) -> Result<StatementType> {
pub async fn distribute_execute(
session: Arc<SessionImpl>,
query: Query,
pinned_snapshot: HummockSnapshotGuard,
pinned_snapshot: QueryHummockSnapshot,
) -> Result<DistributedQueryStream> {
let execution_context: ExecutionContextRef = ExecutionContext::new(session.clone()).into();
let query_manager = session.env().query_manager().clone();
Expand All @@ -238,7 +249,7 @@ pub async fn distribute_execute(
pub async fn local_execute(
session: Arc<SessionImpl>,
query: Query,
pinned_snapshot: HummockSnapshotGuard,
pinned_snapshot: QueryHummockSnapshot,
) -> Result<LocalQueryStream> {
let front_env = session.env();

Expand Down
11 changes: 5 additions & 6 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use crate::scheduler::distributed::StageExecution;
use crate::scheduler::plan_fragmenter::{Query, StageId, ROOT_TASK_ID, ROOT_TASK_OUTPUT_ID};
use crate::scheduler::worker_node_manager::WorkerNodeManagerRef;
use crate::scheduler::{
ExecutionContextRef, HummockSnapshotGuard, PinnedHummockSnapshot, SchedulerError,
SchedulerResult,
ExecutionContextRef, QueryHummockSnapshot, SchedulerError, SchedulerResult,
};

/// Message sent to a `QueryRunner` to control its execution.
Expand Down Expand Up @@ -115,7 +114,7 @@ impl QueryExecution {
&self,
context: ExecutionContextRef,
worker_node_manager: WorkerNodeManagerRef,
pinned_snapshot: HummockSnapshotGuard,
pinned_snapshot: QueryHummockSnapshot,
compute_client_pool: ComputeClientPoolRef,
catalog_reader: CatalogReader,
query_execution_info: QueryExecutionInfoRef,
Expand Down Expand Up @@ -189,7 +188,7 @@ impl QueryExecution {

fn gen_stage_executions(
&self,
pinned_snapshot: &PinnedHummockSnapshot,
pinned_snapshot: &QueryHummockSnapshot,
context: ExecutionContextRef,
worker_node_manager: WorkerNodeManagerRef,
compute_client_pool: ComputeClientPoolRef,
Expand Down Expand Up @@ -225,7 +224,7 @@ impl QueryExecution {
}

impl QueryRunner {
async fn run(mut self, pinned_snapshot: PinnedHummockSnapshot) {
async fn run(mut self, pinned_snapshot: QueryHummockSnapshot) {
// Start leaf stages.
let leaf_stages = self.query.leaf_stages();
for stage_id in &leaf_stages {
Expand Down Expand Up @@ -424,7 +423,7 @@ pub(crate) mod tests {
.start(
ExecutionContext::new(SessionImpl::mock().into()).into(),
worker_node_manager,
pinned_snapshot,
pinned_snapshot.into(),
compute_client_pool,
catalog_reader,
query_execution_info,
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/scheduler/distributed/query_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::catalog::catalog_service::CatalogReader;
use crate::scheduler::plan_fragmenter::{Query, QueryId};
use crate::scheduler::worker_node_manager::WorkerNodeManagerRef;
use crate::scheduler::{
ExecutionContextRef, HummockSnapshotGuard, HummockSnapshotManagerRef, SchedulerResult,
ExecutionContextRef, HummockSnapshotManagerRef, QueryHummockSnapshot, SchedulerResult,
};

pub struct DistributedQueryStream {
Expand Down Expand Up @@ -160,7 +160,7 @@ impl QueryManager {
&self,
context: ExecutionContextRef,
query: Query,
pinned_snapshot: HummockSnapshotGuard,
pinned_snapshot: QueryHummockSnapshot,
) -> SchedulerResult<DistributedQueryStream> {
let query_id = query.query_id.clone();
let query_execution = Arc::new(QueryExecution::new(query, context.session().id()));
Expand Down
26 changes: 23 additions & 3 deletions src/frontend/src/scheduler/hummock_snapshot_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::time::{Duration, Instant};

use anyhow::anyhow;
use arc_swap::ArcSwap;
use risingwave_common::util::epoch::INVALID_EPOCH;
use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
use risingwave_pb::hummock::HummockSnapshot;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot::{channel as once_channel, Sender as Callback};
Expand All @@ -32,7 +32,27 @@ use crate::scheduler::{SchedulerError, SchedulerResult};
const UNPIN_INTERVAL_SECS: u64 = 10;

pub type HummockSnapshotManagerRef = Arc<HummockSnapshotManager>;
pub type PinnedHummockSnapshot = HummockSnapshotGuard;
pub enum QueryHummockSnapshot {
FrontendPinned(HummockSnapshotGuard),
/// Other arbitrary epoch, e.g. user specified.
/// Availability and consistency of underlying data should be guaranteed accordingly.
Other(Epoch),
}

impl QueryHummockSnapshot {
pub fn get_committed_epoch(&self) -> u64 {
match self {
QueryHummockSnapshot::FrontendPinned(s) => s.snapshot.committed_epoch,
QueryHummockSnapshot::Other(s) => s.0,
}
}
}

impl From<HummockSnapshotGuard> for QueryHummockSnapshot {
fn from(s: HummockSnapshotGuard) -> Self {
QueryHummockSnapshot::FrontendPinned(s)
}
}

type SnapshotRef = Arc<ArcSwap<HummockSnapshot>>;

Expand Down Expand Up @@ -165,7 +185,7 @@ impl HummockSnapshotManager {
}
}

pub async fn acquire(&self, query_id: &QueryId) -> SchedulerResult<PinnedHummockSnapshot> {
pub async fn acquire(&self, query_id: &QueryId) -> SchedulerResult<HummockSnapshotGuard> {
let (sender, rc) = once_channel();
let msg = EpochOperation::RequestEpoch {
query_id: query_id.clone(),
Expand Down
7 changes: 3 additions & 4 deletions src/frontend/src/scheduler/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ use tracing::debug;
use uuid::Uuid;

use super::plan_fragmenter::{PartitionInfo, QueryStageRef};
use super::HummockSnapshotGuard;
use crate::optimizer::plan_node::PlanNodeType;
use crate::scheduler::plan_fragmenter::{ExecutionPlanNode, Query, StageId};
use crate::scheduler::task_context::FrontendBatchTaskContext;
use crate::scheduler::SchedulerResult;
use crate::scheduler::{QueryHummockSnapshot, SchedulerResult};
use crate::session::{AuthContext, FrontendEnv};

pub struct LocalQueryStream {
Expand Down Expand Up @@ -72,7 +71,7 @@ pub struct LocalQueryExecution {
query: Query,
front_env: FrontendEnv,
// The snapshot will be released when LocalQueryExecution is dropped.
snapshot: HummockSnapshotGuard,
snapshot: QueryHummockSnapshot,
auth_context: Arc<AuthContext>,
}

Expand All @@ -81,7 +80,7 @@ impl LocalQueryExecution {
query: Query,
front_env: FrontendEnv,
sql: S,
snapshot: HummockSnapshotGuard,
snapshot: QueryHummockSnapshot,
auth_context: Arc<AuthContext>,
) -> Self {
Self {
Expand Down

0 comments on commit d9df1f0

Please sign in to comment.