Skip to content

Commit

Permalink
feat(batch): query historical epoch data
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Dec 12, 2022
1 parent e3c37a8 commit d8d5c00
Show file tree
Hide file tree
Showing 35 changed files with 690 additions and 341 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ package backup_service;

option optimize_for = SPEED;

message MetaBackupManifestId {
uint64 id = 1;
}

enum BackupJobStatus {
UNSPECIFIED = 0;
RUNNING = 1;
Expand Down
3 changes: 3 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import "hummock.proto";
import "source.proto";
import "stream_plan.proto";
import "user.proto";
import "backup_service.proto";

option optimize_for = SPEED;

Expand Down Expand Up @@ -205,6 +206,7 @@ message MetaSnapshot {
hummock.HummockVersion hummock_version = 12;

SnapshotVersion version = 13;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
}

message SubscribeResponse {
Expand Down Expand Up @@ -232,6 +234,7 @@ message SubscribeResponse {
hummock.HummockSnapshot hummock_snapshot = 14;
hummock.HummockVersionDeltas hummock_version_deltas = 15;
MetaSnapshot snapshot = 16;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 17;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ where
version_delta.version_deltas[0].id > info.hummock_version.as_ref().unwrap().id
}
Info::HummockSnapshot(_) => true,
Info::MetaBackupManifestId(_) => true,
Info::Snapshot(_) => unreachable!(),
});

Expand Down
70 changes: 69 additions & 1 deletion src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD};

use crate::error::{ErrorCode, RwError};
use crate::session_config::transaction_isolation_level::IsolationLevel;
use crate::util::epoch::Epoch;

// This is a hack, &'static str is not allowed as a const generics argument.
// TODO: refine this using the adt_const_params feature.
const CONFIG_KEYS: [&str; 10] = [
const CONFIG_KEYS: [&str; 11] = [
"RW_IMPLICIT_FLUSH",
"CREATE_COMPACTION_GROUP_FOR_MV",
"QUERY_MODE",
Expand All @@ -38,6 +39,7 @@ const CONFIG_KEYS: [&str; 10] = [
"MAX_SPLIT_RANGE_GAP",
"SEARCH_PATH",
"TRANSACTION ISOLATION LEVEL",
"QUERY_EPOCH",
];

// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] =
Expand All @@ -52,6 +54,7 @@ const BATCH_ENABLE_LOOKUP_JOIN: usize = 6;
const MAX_SPLIT_RANGE_GAP: usize = 7;
const SEARCH_PATH: usize = 8;
const TRANSACTION_ISOLATION_LEVEL: usize = 9;
const QUERY_EPOCH: usize = 10;

trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> {
fn entry_name() -> &'static str;
Expand Down Expand Up @@ -184,6 +187,51 @@ impl<const NAME: usize, const DEFAULT: i32> TryFrom<&[&str]> for ConfigI32<NAME,
}
}

struct ConfigU64<const NAME: usize, const DEFAULT: u64 = 0>(u64);

impl<const NAME: usize, const DEFAULT: u64> Default for ConfigU64<NAME, DEFAULT> {
fn default() -> Self {
ConfigU64(DEFAULT)
}
}

impl<const NAME: usize, const DEFAULT: u64> Deref for ConfigU64<NAME, DEFAULT> {
type Target = u64;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<const NAME: usize, const DEFAULT: u64> ConfigEntry for ConfigU64<NAME, DEFAULT> {
fn entry_name() -> &'static str {
CONFIG_KEYS[NAME]
}
}

impl<const NAME: usize, const DEFAULT: u64> TryFrom<&[&str]> for ConfigU64<NAME, DEFAULT> {
type Error = RwError;

fn try_from(value: &[&str]) -> Result<Self, Self::Error> {
if value.len() != 1 {
return Err(ErrorCode::InternalError(format!(
"SET {} takes only one argument",
Self::entry_name()
))
.into());
}

let s = value[0];
s.parse::<u64>().map(ConfigU64).map_err(|_e| {
ErrorCode::InvalidConfigValue {
config_entry: Self::entry_name().to_string(),
config_value: s.to_string(),
}
.into()
})
}
}

pub struct VariableInfo {
pub name: String,
pub setting: String,
Expand All @@ -198,6 +246,7 @@ type ExtraFloatDigit = ConfigI32<EXTRA_FLOAT_DIGITS, 1>;
type DateStyle = ConfigString<DATE_STYLE>;
type BatchEnableLookupJoin = ConfigBool<BATCH_ENABLE_LOOKUP_JOIN, false>;
type MaxSplitRangeGap = ConfigI32<MAX_SPLIT_RANGE_GAP, 8>;
type QueryEpoch = ConfigU64<QUERY_EPOCH, 0>;

#[derive(Default)]
pub struct ConfigMap {
Expand Down Expand Up @@ -234,6 +283,9 @@ pub struct ConfigMap {

/// see <https://www.postgresql.org/docs/current/transaction-iso.html>
transaction_isolation_level: IsolationLevel,

/// select as of specific epoch
query_epoch: QueryEpoch,
}

impl ConfigMap {
Expand All @@ -257,6 +309,8 @@ impl ConfigMap {
self.max_split_range_gap = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(SearchPath::entry_name()) {
self.search_path = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) {
self.query_epoch = val.as_slice().try_into()?;
} else {
return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into());
}
Expand Down Expand Up @@ -285,6 +339,8 @@ impl ConfigMap {
Ok(self.search_path.to_string())
} else if key.eq_ignore_ascii_case(IsolationLevel::entry_name()) {
Ok(self.transaction_isolation_level.to_string())
} else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) {
Ok(self.query_epoch.to_string())
} else {
Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into())
}
Expand Down Expand Up @@ -336,6 +392,11 @@ impl ConfigMap {
name: SearchPath::entry_name().to_lowercase(),
setting : self.search_path.to_string(),
description : String::from("Sets the order in which schemas are searched when an object (table, data type, function, etc.) is referenced by a simple name with no schema specified")
},
VariableInfo {
name: QueryEpoch::entry_name().to_lowercase(),
setting : self.query_epoch.to_string(),
description : String::from("Sets the historical epoch for querying data. If 0, querying latest data.")
}
]
}
Expand Down Expand Up @@ -379,4 +440,11 @@ impl ConfigMap {
pub fn get_search_path(&self) -> SearchPath {
self.search_path.clone()
}

pub fn get_query_epoch(&self) -> Option<Epoch> {
if self.query_epoch.0 != 0 {
return Some((self.query_epoch.0).into());
}
None
}
}
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub async fn compute_node_serve(
let state_store = StateStoreImpl::new(
&opts.state_store,
&opts.file_cache_dir,
storage_config.clone(),
&config,
hummock_meta_client.clone(),
state_store_metrics.clone(),
object_store_metrics,
Expand Down
8 changes: 6 additions & 2 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, bail, Result};
use risingwave_common::config::StorageConfig;
use risingwave_common::config::{RwConfig, StorageConfig};
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
use risingwave_storage::hummock::{HummockStorage, TieredCacheMetricsBuilder};
Expand Down Expand Up @@ -100,6 +100,10 @@ For `./risedev apply-compose-deploy` users,
share_buffer_compaction_worker_threads_number: 0,
..Default::default()
};
let rw_config = RwConfig {
storage: config.clone(),
..Default::default()
};

tracing::info!("using Hummock config: {:#?}", config);

Expand All @@ -112,7 +116,7 @@ For `./risedev apply-compose-deploy` users,
let state_store_impl = StateStoreImpl::new(
&self.hummock_url,
"",
Arc::new(config),
&rw_config,
Arc::new(MonitoredHummockMetaClient::new(
meta_client.clone(),
metrics.hummock_metrics.clone(),
Expand Down
23 changes: 17 additions & 6 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use itertools::Itertools;
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use postgres_types::FromSql;
use risingwave_common::bail;
use risingwave_common::catalog::Schema;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::session_config::QueryMode;
Expand All @@ -35,7 +36,7 @@ use crate::planner::Planner;
use crate::scheduler::plan_fragmenter::Query;
use crate::scheduler::{
BatchPlanFragmenter, DistributedQueryStream, ExecutionContext, ExecutionContextRef,
HummockSnapshotGuard, LocalQueryExecution, LocalQueryStream,
LocalQueryExecution, LocalQueryStream, QueryHummockSnapshot,
};
use crate::session::SessionImpl;
use crate::PlanRef;
Expand Down Expand Up @@ -133,17 +134,27 @@ pub async fn handle_query(
let hummock_snapshot_manager = session.env().hummock_snapshot_manager();
let query_id = query.query_id().clone();
let pinned_snapshot = hummock_snapshot_manager.acquire(&query_id).await?;

let mut query_snapshot = QueryHummockSnapshot::FrontendPinned(pinned_snapshot);
if let Some(query_epoch) = session.config().get_query_epoch() {
if query_epoch.0 > query_snapshot.get_committed_epoch() {
bail!(
"cannot query with future epoch: {}, current committed epoch: {}",
query_epoch.0,
query_snapshot.get_committed_epoch()
);
}
query_snapshot = QueryHummockSnapshot::Other(query_epoch);
}
match query_mode {
QueryMode::Local => PgResponseStream::LocalQuery(DataChunkToRowSetAdapter::new(
local_execute(session.clone(), query, pinned_snapshot).await?,
local_execute(session.clone(), query, query_snapshot).await?,
column_types,
format,
)),
// Local mode do not support cancel tasks.
QueryMode::Distributed => {
PgResponseStream::DistributedQuery(DataChunkToRowSetAdapter::new(
distribute_execute(session.clone(), query, pinned_snapshot).await?,
distribute_execute(session.clone(), query, query_snapshot).await?,
column_types,
format,
))
Expand Down Expand Up @@ -224,7 +235,7 @@ fn to_statement_type(stmt: &Statement) -> Result<StatementType> {
pub async fn distribute_execute(
session: Arc<SessionImpl>,
query: Query,
pinned_snapshot: HummockSnapshotGuard,
pinned_snapshot: QueryHummockSnapshot,
) -> Result<DistributedQueryStream> {
let execution_context: ExecutionContextRef = ExecutionContext::new(session.clone()).into();
let query_manager = session.env().query_manager().clone();
Expand All @@ -238,7 +249,7 @@ pub async fn distribute_execute(
pub async fn local_execute(
session: Arc<SessionImpl>,
query: Query,
pinned_snapshot: HummockSnapshotGuard,
pinned_snapshot: QueryHummockSnapshot,
) -> Result<LocalQueryStream> {
let front_env = session.env();

Expand Down
3 changes: 3 additions & 0 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ impl ObserverState for FrontendObserverNode {
Info::HummockVersionDeltas(_) => {
panic!("frontend node should not receive HummockVersionDeltas");
}
Info::MetaBackupManifestId(_) => {
panic!("frontend node should not receive MetaBackupManifestId");
}
}
}

Expand Down
11 changes: 5 additions & 6 deletions src/frontend/src/scheduler/distributed/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use crate::scheduler::distributed::StageExecution;
use crate::scheduler::plan_fragmenter::{Query, StageId, ROOT_TASK_ID, ROOT_TASK_OUTPUT_ID};
use crate::scheduler::worker_node_manager::WorkerNodeManagerRef;
use crate::scheduler::{
ExecutionContextRef, HummockSnapshotGuard, PinnedHummockSnapshot, SchedulerError,
SchedulerResult,
ExecutionContextRef, QueryHummockSnapshot, SchedulerError, SchedulerResult,
};

/// Message sent to a `QueryRunner` to control its execution.
Expand Down Expand Up @@ -115,7 +114,7 @@ impl QueryExecution {
&self,
context: ExecutionContextRef,
worker_node_manager: WorkerNodeManagerRef,
pinned_snapshot: HummockSnapshotGuard,
pinned_snapshot: QueryHummockSnapshot,
compute_client_pool: ComputeClientPoolRef,
catalog_reader: CatalogReader,
query_execution_info: QueryExecutionInfoRef,
Expand Down Expand Up @@ -189,7 +188,7 @@ impl QueryExecution {

fn gen_stage_executions(
&self,
pinned_snapshot: &PinnedHummockSnapshot,
pinned_snapshot: &QueryHummockSnapshot,
context: ExecutionContextRef,
worker_node_manager: WorkerNodeManagerRef,
compute_client_pool: ComputeClientPoolRef,
Expand Down Expand Up @@ -225,7 +224,7 @@ impl QueryExecution {
}

impl QueryRunner {
async fn run(mut self, pinned_snapshot: PinnedHummockSnapshot) {
async fn run(mut self, pinned_snapshot: QueryHummockSnapshot) {
// Start leaf stages.
let leaf_stages = self.query.leaf_stages();
for stage_id in &leaf_stages {
Expand Down Expand Up @@ -424,7 +423,7 @@ pub(crate) mod tests {
.start(
ExecutionContext::new(SessionImpl::mock().into()).into(),
worker_node_manager,
pinned_snapshot,
pinned_snapshot.into(),
compute_client_pool,
catalog_reader,
query_execution_info,
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/scheduler/distributed/query_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::catalog::catalog_service::CatalogReader;
use crate::scheduler::plan_fragmenter::{Query, QueryId};
use crate::scheduler::worker_node_manager::WorkerNodeManagerRef;
use crate::scheduler::{
ExecutionContextRef, HummockSnapshotGuard, HummockSnapshotManagerRef, SchedulerResult,
ExecutionContextRef, HummockSnapshotManagerRef, QueryHummockSnapshot, SchedulerResult,
};

pub struct DistributedQueryStream {
Expand Down Expand Up @@ -160,7 +160,7 @@ impl QueryManager {
&self,
context: ExecutionContextRef,
query: Query,
pinned_snapshot: HummockSnapshotGuard,
pinned_snapshot: QueryHummockSnapshot,
) -> SchedulerResult<DistributedQueryStream> {
let query_id = query.query_id.clone();
let query_execution = Arc::new(QueryExecution::new(query, context.session().id()));
Expand Down
Loading

0 comments on commit d8d5c00

Please sign in to comment.