Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batch): query historical epoch data #6840

Merged
merged 14 commits into from
Dec 19, 2022
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ if [[ "$RUN_META_BACKUP" -eq "1" ]]; then
chmod +x ./target/debug/backup-restore

test_root="src/storage/backup/integration_tests"
BACKUP_TEST_PREFIX_BIN="target/debug" BACKUP_TEST_PREFIX_DATA=".risingwave/data" bash "${test_root}/run_all.sh"
BACKUP_TEST_PREFIX_BIN="target/debug" \
BACKUP_TEST_PREFIX_CONFIG=".risingwave/config" \
bash "${test_root}/run_all.sh"
echo "--- Kill cluster"
cargo make kill
fi
Expand Down
26 changes: 26 additions & 0 deletions dashboard/proto/gen/backup_service.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 32 additions & 1 deletion dashboard/proto/gen/meta.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ package backup_service;

option optimize_for = SPEED;

message MetaBackupManifestId {
uint64 id = 1;
}

enum BackupJobStatus {
UNSPECIFIED = 0;
RUNNING = 1;
Expand Down
3 changes: 3 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package meta;

import "backup_service.proto";
import "catalog.proto";
import "common.proto";
import "hummock.proto";
Expand Down Expand Up @@ -205,6 +206,7 @@ message MetaSnapshot {
hummock.HummockVersion hummock_version = 12;

SnapshotVersion version = 13;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
}

message SubscribeResponse {
Expand Down Expand Up @@ -232,6 +234,7 @@ message SubscribeResponse {
hummock.HummockSnapshot hummock_snapshot = 14;
hummock.HummockVersionDeltas hummock_version_deltas = 15;
MetaSnapshot snapshot = 16;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 17;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ where
version_delta.version_deltas[0].id > info.hummock_version.as_ref().unwrap().id
}
Info::HummockSnapshot(_) => true,
Info::MetaBackupManifestId(_) => true,
Info::Snapshot(_) => unreachable!(),
});

Expand Down
70 changes: 69 additions & 1 deletion src/common/src/session_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ pub use search_path::{SearchPath, USER_NAME_WILD_CARD};

use crate::error::{ErrorCode, RwError};
use crate::session_config::transaction_isolation_level::IsolationLevel;
use crate::util::epoch::Epoch;

// This is a hack, &'static str is not allowed as a const generics argument.
// TODO: refine this using the adt_const_params feature.
const CONFIG_KEYS: [&str; 10] = [
const CONFIG_KEYS: [&str; 11] = [
"RW_IMPLICIT_FLUSH",
"CREATE_COMPACTION_GROUP_FOR_MV",
"QUERY_MODE",
Expand All @@ -38,6 +39,7 @@ const CONFIG_KEYS: [&str; 10] = [
"MAX_SPLIT_RANGE_GAP",
"SEARCH_PATH",
"TRANSACTION ISOLATION LEVEL",
"QUERY_EPOCH",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User doesn't understand epoch so asking user to manually specify the exact query epoch for time-travel query is not realistic. In terms of the syntax, I did a quick check on PG's doc and sadly it seems to deprecate its time travel query SQL command. I suggest we do either of the following options:

  1. Support querying the backup epochs as well as lookup backup epochs by timestamp. With that, user can find out the exact query epoch it should use for time travel query.
  2. Support AS OF syntax in SELECT: SELECT ... FROM ... AS OF <timestamp>. This is easier to use but may confuse user because this syntax seems to indicate that we can support time-travel query without backup, which is not the case.

For now, I think 1) is sufficient since we don't expect user to run time travel query very often. cc @fuyufjh any suggestions?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I think it is okay to keep the QUERY_EPOCH session config for admin usage and debugging.

Copy link
Contributor Author

@zwang28 zwang28 Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both options are fine.

  • With option 1, user can get an exact epoch, then still use it via QUERY_EPOCH, right? Or shall we use another ambiguous session config, e.g. QUERY_BACKUP_EPOCH.
  • To implementation option 2, frontend need to be aware of (and sync) backup manifest, thus it can choose a best-fit query epoch for the given arbitrary <timestamp>.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both options are fine.

  • With option 1, user can get an exact epoch, then still use it via QUERY_EPOCH, right? Or shall we use another ambiguous session config, e.g. QUERY_BACKUP_EPOCH.
  • To implementation option 2, frontend need to be aware of (and sync) backup manifest, thus it can choose a best-fit query epoch for the given arbitrary .

Sorry for the late reply. For option 1, I mean we provide a way to get the backed-up epochs via SQL statement (preferred) or risectl. Then we can set QUERY_EPOCH accordingly.

Copy link
Contributor Author

@zwang28 zwang28 Dec 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followed option 1.
Added a new system table: rw_catalog.rw_meta_snapshot. Query it returns all available backups along with human-readable timestamp range of them (safe_epoch_ts and max_committed_epoch_ts). safe_epoch_ts of the first row is NULL, because safe_epoch=0 is not a valid epoch.

dev=> select * from rw_catalog.rw_meta_snapshot;
 meta_snapshot_id | hummock_version_id |    safe_epoch    |      safe_epoch_ts      | max_committed_epoch | max_committed_epoch_ts  
------------------+--------------------+------------------+-------------------------+---------------------+-------------------------
                1 |                  5 |                0 |                         |    3551844974919680 | 2022-12-19 06:40:53.255
                2 |                  9 | 3551845695750144 | 2022-12-19 06:41:04.254 |    3551847139508224 | 2022-12-19 06:41:26.284
(2 rows)

As the size of backup manifest is small enough, I make frontend query it directly from meta node.
Alternatively we could make frontend read backup manifest from backup storage, instead of meta node, but it doesn't bring significant benefit now, so it's not adopted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently backup creation and deletion is still via risectl. We can also support create backup and delete backup <id> SQL later.

];

// MUST HAVE 1v1 relationship to CONFIG_KEYS. e.g. CONFIG_KEYS[IMPLICIT_FLUSH] =
Expand All @@ -52,6 +54,7 @@ const BATCH_ENABLE_LOOKUP_JOIN: usize = 6;
const MAX_SPLIT_RANGE_GAP: usize = 7;
const SEARCH_PATH: usize = 8;
const TRANSACTION_ISOLATION_LEVEL: usize = 9;
const QUERY_EPOCH: usize = 10;

trait ConfigEntry: Default + for<'a> TryFrom<&'a [&'a str], Error = RwError> {
fn entry_name() -> &'static str;
Expand Down Expand Up @@ -184,6 +187,51 @@ impl<const NAME: usize, const DEFAULT: i32> TryFrom<&[&str]> for ConfigI32<NAME,
}
}

struct ConfigU64<const NAME: usize, const DEFAULT: u64 = 0>(u64);

impl<const NAME: usize, const DEFAULT: u64> Default for ConfigU64<NAME, DEFAULT> {
fn default() -> Self {
ConfigU64(DEFAULT)
}
}

impl<const NAME: usize, const DEFAULT: u64> Deref for ConfigU64<NAME, DEFAULT> {
type Target = u64;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<const NAME: usize, const DEFAULT: u64> ConfigEntry for ConfigU64<NAME, DEFAULT> {
fn entry_name() -> &'static str {
CONFIG_KEYS[NAME]
}
}

impl<const NAME: usize, const DEFAULT: u64> TryFrom<&[&str]> for ConfigU64<NAME, DEFAULT> {
type Error = RwError;

fn try_from(value: &[&str]) -> Result<Self, Self::Error> {
if value.len() != 1 {
return Err(ErrorCode::InternalError(format!(
"SET {} takes only one argument",
Self::entry_name()
))
.into());
}

let s = value[0];
s.parse::<u64>().map(ConfigU64).map_err(|_e| {
ErrorCode::InvalidConfigValue {
config_entry: Self::entry_name().to_string(),
config_value: s.to_string(),
}
.into()
})
}
}

pub struct VariableInfo {
pub name: String,
pub setting: String,
Expand All @@ -198,6 +246,7 @@ type ExtraFloatDigit = ConfigI32<EXTRA_FLOAT_DIGITS, 1>;
type DateStyle = ConfigString<DATE_STYLE>;
type BatchEnableLookupJoin = ConfigBool<BATCH_ENABLE_LOOKUP_JOIN, true>;
type MaxSplitRangeGap = ConfigI32<MAX_SPLIT_RANGE_GAP, 8>;
type QueryEpoch = ConfigU64<QUERY_EPOCH, 0>;

#[derive(Default)]
pub struct ConfigMap {
Expand Down Expand Up @@ -234,6 +283,9 @@ pub struct ConfigMap {

/// see <https://www.postgresql.org/docs/current/transaction-iso.html>
transaction_isolation_level: IsolationLevel,

/// select as of specific epoch
query_epoch: QueryEpoch,
}

impl ConfigMap {
Expand All @@ -257,6 +309,8 @@ impl ConfigMap {
self.max_split_range_gap = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(SearchPath::entry_name()) {
self.search_path = val.as_slice().try_into()?;
} else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) {
self.query_epoch = val.as_slice().try_into()?;
} else {
return Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into());
}
Expand Down Expand Up @@ -285,6 +339,8 @@ impl ConfigMap {
Ok(self.search_path.to_string())
} else if key.eq_ignore_ascii_case(IsolationLevel::entry_name()) {
Ok(self.transaction_isolation_level.to_string())
} else if key.eq_ignore_ascii_case(QueryEpoch::entry_name()) {
Ok(self.query_epoch.to_string())
} else {
Err(ErrorCode::UnrecognizedConfigurationParameter(key.to_string()).into())
}
Expand Down Expand Up @@ -336,6 +392,11 @@ impl ConfigMap {
name: SearchPath::entry_name().to_lowercase(),
setting : self.search_path.to_string(),
description : String::from("Sets the order in which schemas are searched when an object (table, data type, function, etc.) is referenced by a simple name with no schema specified")
},
VariableInfo {
name: QueryEpoch::entry_name().to_lowercase(),
setting : self.query_epoch.to_string(),
description : String::from("Sets the historical epoch for querying data. If 0, querying latest data.")
}
]
}
Expand Down Expand Up @@ -379,4 +440,11 @@ impl ConfigMap {
pub fn get_search_path(&self) -> SearchPath {
self.search_path.clone()
}

pub fn get_query_epoch(&self) -> Option<Epoch> {
if self.query_epoch.0 != 0 {
return Some((self.query_epoch.0).into());
}
None
}
}
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub async fn compute_node_serve(
let state_store = StateStoreImpl::new(
&opts.state_store,
&opts.file_cache_dir,
storage_config.clone(),
&config,
hummock_meta_client.clone(),
state_store_metrics.clone(),
object_store_metrics,
Expand Down
8 changes: 6 additions & 2 deletions src/ctl/src/common/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, bail, Result};
use risingwave_common::config::StorageConfig;
use risingwave_common::config::{RwConfig, StorageConfig};
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
use risingwave_storage::hummock::{HummockStorage, TieredCacheMetricsBuilder};
Expand Down Expand Up @@ -100,6 +100,10 @@ For `./risedev apply-compose-deploy` users,
share_buffer_compaction_worker_threads_number: 0,
..Default::default()
};
let rw_config = RwConfig {
storage: config.clone(),
..Default::default()
};

tracing::info!("using Hummock config: {:#?}", config);

Expand All @@ -112,7 +116,7 @@ For `./risedev apply-compose-deploy` users,
let state_store_impl = StateStoreImpl::new(
&self.hummock_url,
"",
Arc::new(config),
&rw_config,
Arc::new(MonitoredHummockMetaClient::new(
meta_client.clone(),
metrics.hummock_metrics.clone(),
Expand Down
Loading