Skip to content

Commit

Permalink
fix(meta): use per table mce and safe epoch in metadata backup (#17227)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Jun 13, 2024
1 parent 4d32c18 commit 3b823d0
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 60 deletions.
3 changes: 3 additions & 0 deletions proto/backup_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ syntax = "proto3";

package backup_service;

import "hummock.proto";

option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

Expand Down Expand Up @@ -50,6 +52,7 @@ message MetaSnapshotMetadata {
optional uint32 format_version = 5;
optional string remarks = 6;
optional string rw_version = 7;
map<uint32, hummock.StateTableInfo> state_table_info = 8;
}

service BackupService {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::{Fields, Timestamp};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::types::{Fields, JsonbVal};
use risingwave_frontend_macro::system_catalog;
use serde_json::json;

use crate::catalog::system_catalog::SysCatalogReaderImpl;
use crate::error::Result;
Expand All @@ -24,30 +24,13 @@ struct RwMetaSnapshot {
#[primary_key]
meta_snapshot_id: i64,
hummock_version_id: i64,
// the smallest epoch this meta snapshot includes
safe_epoch: i64,
// human-readable timestamp of safe_epoch
safe_epoch_ts: Option<Timestamp>,
// the largest epoch this meta snapshot includes
max_committed_epoch: i64,
// human-readable timestamp of max_committed_epoch
max_committed_epoch_ts: Option<Timestamp>,
remarks: Option<String>,
state_table_info: Option<JsonbVal>,
rw_version: Option<String>,
}

#[system_catalog(table, "rw_catalog.rw_meta_snapshot")]
async fn read_meta_snapshot(reader: &SysCatalogReaderImpl) -> Result<Vec<RwMetaSnapshot>> {
let try_get_date_time = |epoch: u64| {
if epoch == 0 {
return None;
}
let time_millis = Epoch::from(epoch).as_unix_millis();
Timestamp::with_secs_nsecs(
(time_millis / 1000) as i64,
(time_millis % 1000 * 1_000_000) as u32,
)
.ok()
};
let meta_snapshots = reader
.meta_client
.list_meta_snapshots()
Expand All @@ -56,11 +39,9 @@ async fn read_meta_snapshot(reader: &SysCatalogReaderImpl) -> Result<Vec<RwMetaS
.map(|s| RwMetaSnapshot {
meta_snapshot_id: s.id as _,
hummock_version_id: s.hummock_version_id as _,
safe_epoch: s.safe_epoch as _,
safe_epoch_ts: try_get_date_time(s.safe_epoch),
max_committed_epoch: s.max_committed_epoch as _,
max_committed_epoch_ts: try_get_date_time(s.max_committed_epoch),
remarks: s.remarks,
state_table_info: Some(json!(s.state_table_info).into()),
rw_version: s.rw_version.clone(),
})
.collect();
Ok(meta_snapshots)
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/meta_snapshot_builder_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn map_db_err(e: DbErr) -> BackupError {

macro_rules! define_set_metadata {
($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
pub async fn set_metadata(
async fn set_metadata(
metadata: &mut MetadataV2,
txn: &sea_orm::DatabaseTransaction,
) -> BackupResult<()> {
Expand Down
46 changes: 29 additions & 17 deletions src/meta/src/backup_restore/restore_impl/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,28 +93,40 @@ impl WriterModelV2ToMetaStoreV2 {
}
}

macro_rules! define_write_model_v2_to_meta_store_v2 {
($( {$name:ident, $mod_path:ident::$mod_name:ident} ),*) => {
async fn write_model_v2_to_meta_store_v2(
metadata: &risingwave_backup::meta_snapshot_v2::MetadataV2,
db: &sea_orm::DatabaseConnection,
) -> BackupResult<()> {
$(
insert_models(metadata.$name.clone(), db).await?;
)*
Ok(())
}
};
}

risingwave_backup::for_all_metadata_models_v2!(define_write_model_v2_to_meta_store_v2);

#[async_trait::async_trait]
impl Writer<MetadataV2> for WriterModelV2ToMetaStoreV2 {
async fn write(&self, target_snapshot: MetaSnapshot<MetadataV2>) -> BackupResult<()> {
let metadata = target_snapshot.metadata;
let db = &self.meta_store.conn;
write_model_v2_to_meta_store_v2(&metadata, db).await?;
insert_models(metadata.seaql_migrations.clone(), db).await?;
insert_models(metadata.clusters.clone(), db).await?;
insert_models(metadata.version_stats.clone(), db).await?;
insert_models(metadata.compaction_configs.clone(), db).await?;
insert_models(metadata.hummock_sequences.clone(), db).await?;
insert_models(metadata.workers.clone(), db).await?;
insert_models(metadata.worker_properties.clone(), db).await?;
insert_models(metadata.users.clone(), db).await?;
insert_models(metadata.user_privileges.clone(), db).await?;
insert_models(metadata.objects.clone(), db).await?;
insert_models(metadata.object_dependencies.clone(), db).await?;
insert_models(metadata.databases.clone(), db).await?;
insert_models(metadata.schemas.clone(), db).await?;
insert_models(metadata.streaming_jobs.clone(), db).await?;
insert_models(metadata.fragments.clone(), db).await?;
insert_models(metadata.actors.clone(), db).await?;
insert_models(metadata.actor_dispatchers.clone(), db).await?;
insert_models(metadata.connections.clone(), db).await?;
insert_models(metadata.sources.clone(), db).await?;
insert_models(metadata.tables.clone(), db).await?;
insert_models(metadata.sinks.clone(), db).await?;
insert_models(metadata.views.clone(), db).await?;
insert_models(metadata.indexes.clone(), db).await?;
insert_models(metadata.functions.clone(), db).await?;
insert_models(metadata.system_parameters.clone(), db).await?;
insert_models(metadata.catalog_versions.clone(), db).await?;
insert_models(metadata.subscriptions.clone(), db).await?;
insert_models(metadata.session_parameters.clone(), db).await?;
insert_models(metadata.secrets.clone(), db).await?;
// update_auto_inc must be called last.
update_auto_inc(&metadata, db).await?;
Ok(())
Expand Down
14 changes: 5 additions & 9 deletions src/storage/backup/integration_tests/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ function execute_sql_and_expect() {
}

function get_max_committed_epoch() {
mce=$(${BACKUP_TEST_RW_ALL_IN_ONE} risectl hummock list-version --verbose 2>&1 | grep max_committed_epoch | sed -n 's/^.*max_committed_epoch: \(.*\),/\1/p')
mce=$(${BACKUP_TEST_RW_ALL_IN_ONE} risectl hummock list-version --verbose 2>&1 | grep committed_epoch | sed -n 's/^.*committed_epoch: \(.*\),/\1/p')
# always take the smallest one
echo "${mce}"|sort -n |head -n 1
}

function get_safe_epoch() {
safe_epoch=$(${BACKUP_TEST_RW_ALL_IN_ONE} risectl hummock list-version --verbose 2>&1 | grep safe_epoch | sed -n 's/^.*safe_epoch: \(.*\),/\1/p')
# always take the smallest one
echo "${safe_epoch}"|sort -n |head -n 1
# always take the largest one
echo "${safe_epoch}"|sort -n -r |head -n 1
}

function get_total_sst_count() {
Expand All @@ -173,17 +173,13 @@ function get_total_sst_count() {
}

function get_max_committed_epoch_in_backup() {
local id
id=$1
sed_str="s/.*{\"id\":${id},\"hummock_version_id\":.*,\"ssts\":\[.*\],\"max_committed_epoch\":\([[:digit:]]*\),\"safe_epoch\":.*}.*/\1/p"
sed_str="s/.*\"state_table_info\":{\"[[:digit:]]*\":{\"committedEpoch\":\"\([[:digit:]]*\)\",\"safeEpoch\":\"\([[:digit:]]*\)\"}.*/\1/p"
${BACKUP_TEST_MCLI} -C "${BACKUP_TEST_MCLI_CONFIG}" \
cat "hummock-minio/hummock001/backup/manifest.json" | sed -n "${sed_str}"
}

function get_safe_epoch_in_backup() {
local id
id=$1
sed_str="s/.*{\"id\":${id},\"hummock_version_id\":.*,\"ssts\":\[.*\],\"max_committed_epoch\":.*,\"safe_epoch\":\([[:digit:]]*\).*}.*/\1/p"
sed_str="s/.*\"state_table_info\":{\"[[:digit:]]*\":{\"committedEpoch\":\"\([[:digit:]]*\)\",\"safeEpoch\":\"\([[:digit:]]*\)\"}.*/\2/p"
${BACKUP_TEST_MCLI} -C "${BACKUP_TEST_MCLI_CONFIG}" \
cat "hummock-minio/hummock001/backup/manifest.json" | sed -n "${sed_str}"
}
Expand Down
10 changes: 5 additions & 5 deletions src/storage/backup/integration_tests/test_query_backup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ select * from t1;

job_id=$(backup)
echo "${job_id}"
backup_mce=$(get_max_committed_epoch_in_backup "${job_id}")
backup_safe_epoch=$(get_safe_epoch_in_backup "${job_id}")
backup_mce=$(get_max_committed_epoch_in_backup)
backup_safe_epoch=$(get_safe_epoch_in_backup)
echo "backup MCE: ${backup_mce}"
echo "backup safe_epoch: ${backup_safe_epoch}"

Expand Down Expand Up @@ -55,10 +55,10 @@ do
sleep 5
min_pinned_snapshot=$(get_min_pinned_snapshot)
done
# safe epoch equals to 0 because no compaction has been done
# safe epoch equals to backup_safe_epoch because no compaction has been done
safe_epoch=$(get_safe_epoch)
echo "safe epoch after unpin: ${safe_epoch}"
[ "${safe_epoch}" -eq 0 ]
[ "${safe_epoch}" -eq "${backup_safe_epoch}" ]
# trigger a compaction to increase safe_epoch
manual_compaction -c 3 -l 0
# wait until compaction is done
Expand All @@ -68,6 +68,7 @@ do
sleep 5
done
echo "safe epoch after compaction: ${safe_epoch}"
[ "${safe_epoch}" -gt "${backup_safe_epoch}" ]

echo "QUERY_EPOCH=safe_epoch. It should fail because it's not covered by any backup"
execute_sql_and_expect \
Expand All @@ -83,7 +84,6 @@ select * from t1;" \

echo "QUERY_EPOCH=backup_safe_epoch + 1<<16 + 1, it's < safe_epoch but covered by backup"
epoch=$((backup_safe_epoch + (1<<16) + 1))
[ ${epoch} -eq 65537 ]
execute_sql_and_expect \
"SET QUERY_EPOCH TO ${epoch};
select * from t1;" \
Expand Down
49 changes: 48 additions & 1 deletion src/storage/backup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ pub mod meta_snapshot_v1;
pub mod meta_snapshot_v2;
pub mod storage;

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::hash::Hasher;

use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::RW_VERSION;
use risingwave_hummock_sdk::version::HummockVersion;
use risingwave_hummock_sdk::{HummockSstableObjectId, HummockVersionId};
use risingwave_pb::backup_service::{PbMetaSnapshotManifest, PbMetaSnapshotMetadata};
use risingwave_pb::hummock::PbStateTableInfo;
use serde::{Deserialize, Serialize};

use crate::error::{BackupError, BackupResult};
Expand All @@ -58,6 +60,8 @@ pub struct MetaSnapshotMetadata {
#[serde(default)]
pub format_version: u32,
pub remarks: Option<String>,
#[serde(with = "table_id_key_map")]
pub state_table_info: HashMap<TableId, PbStateTableInfo>,
pub rw_version: Option<String>,
}

Expand All @@ -76,6 +80,7 @@ impl MetaSnapshotMetadata {
safe_epoch: v.visible_table_safe_epoch(),
format_version,
remarks,
state_table_info: v.state_table_info.info().clone(),
rw_version: Some(RW_VERSION.to_owned()),
}
}
Expand Down Expand Up @@ -115,6 +120,11 @@ impl From<&MetaSnapshotMetadata> for PbMetaSnapshotMetadata {
safe_epoch: m.safe_epoch,
format_version: Some(m.format_version),
remarks: m.remarks.clone(),
state_table_info: m
.state_table_info
.iter()
.map(|(t, i)| (t.table_id, i.clone()))
.collect(),
rw_version: m.rw_version.clone(),
}
}
Expand All @@ -128,3 +138,40 @@ impl From<&MetaSnapshotManifest> for PbMetaSnapshotManifest {
}
}
}

mod table_id_key_map {
use std::collections::HashMap;
use std::str::FromStr;

use risingwave_common::catalog::TableId;
use risingwave_pb::hummock::PbStateTableInfo;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

pub fn serialize<S>(
map: &HashMap<TableId, PbStateTableInfo>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let map_as_str: HashMap<String, &PbStateTableInfo> =
map.iter().map(|(k, v)| (k.to_string(), v)).collect();
map_as_str.serialize(serializer)
}

pub fn deserialize<'de, D>(
deserializer: D,
) -> Result<HashMap<TableId, PbStateTableInfo>, D::Error>
where
D: Deserializer<'de>,
{
let map_as_str: HashMap<String, PbStateTableInfo> = HashMap::deserialize(deserializer)?;
map_as_str
.into_iter()
.map(|(k, v)| {
let key = u32::from_str(&k).map_err(serde::de::Error::custom)?;
Ok((TableId::new(key), v))
})
.collect()
}
}
12 changes: 11 additions & 1 deletion src/storage/src/hummock/backup_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use risingwave_backup::error::BackupError;
use risingwave_backup::meta_snapshot::{MetaSnapshot, Metadata};
use risingwave_backup::storage::{MetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
use risingwave_backup::{meta_snapshot_v1, meta_snapshot_v2, MetaSnapshotId};
use risingwave_common::catalog::TableId;
use risingwave_common::config::ObjectStoreConfig;
use risingwave_common::system_param::local_manager::SystemParamsReaderRef;
use risingwave_common::system_param::reader::SystemParamsRead;
Expand Down Expand Up @@ -182,6 +183,7 @@ impl BackupReader {
/// Otherwise, reading the version may encounter object store error, due to SST absence.
pub async fn try_get_hummock_version(
self: &BackupReaderRef,
table_id: TableId,
epoch: u64,
) -> StorageResult<Option<PinnedVersion>> {
// Use the same store throughout the call.
Expand All @@ -192,7 +194,15 @@ impl BackupReader {
.manifest()
.snapshot_metadata
.iter()
.find(|v| epoch >= v.safe_epoch && epoch <= v.max_committed_epoch)
.find(|v| {
if v.state_table_info.is_empty() {
return epoch >= v.safe_epoch && epoch <= v.max_committed_epoch;
}
if let Some(m) = v.state_table_info.get(&table_id) {
return epoch >= m.safe_epoch && epoch <= m.committed_epoch;
}
false
})
.cloned()
else {
return Ok(None);
Expand Down
6 changes: 5 additions & 1 deletion src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,11 @@ impl HummockStorage {
table_id: TableId,
key_range: TableKeyRange,
) -> StorageResult<(TableKeyRange, ReadVersionTuple)> {
match self.backup_reader.try_get_hummock_version(epoch).await {
match self
.backup_reader
.try_get_hummock_version(table_id, epoch)
.await
{
Ok(Some(backup_version)) => {
validate_safe_epoch(backup_version.version(), table_id, epoch)?;

Expand Down

0 comments on commit 3b823d0

Please sign in to comment.