Skip to content

Commit

Permalink
extract state table info struct
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jun 5, 2024
1 parent 37df492 commit ed2b4cc
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ async fn read_hummock_snapshot_groups(
let version = reader.meta_client.get_hummock_current_version().await?;
Ok(version
.state_table_info
.info()
.iter()
.map(|(table_id, info)| RwHummockSnapshot {
table_id: table_id.table_id as _,
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/manager/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl<'a> HummockVersionTransaction<'a> {
version_delta.state_table_info_delta = version_delta
.latest_version()
.state_table_info
.info()
.iter()
.map(|(table_id, info)| {
(
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{
};
use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId};
use risingwave_hummock_sdk::table_stats::add_prost_table_stats_map;
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta, HummockVersionStateTableInfo};
use risingwave_hummock_sdk::{
CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId,
FIRST_VERSION_ID,
Expand Down Expand Up @@ -355,7 +355,7 @@ pub(super) fn create_init_version(default_compaction_config: CompactionConfig) -
safe_epoch: INVALID_EPOCH,
table_watermarks: HashMap::new(),
table_change_log: HashMap::new(),
state_table_info: HashMap::new(),
state_table_info: HummockVersionStateTableInfo::empty(),
};
for group_id in [
StaticCompactionGroupId::StateDefault as CompactionGroupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_pb::hummock::table_watermarks::PbEpochNewWatermarks;
use risingwave_pb::hummock::{
CompactionConfig, CompatibilityVersion, GroupConstruct, GroupDestroy, GroupMetaChange,
GroupTableChange, Level, LevelType, OverlappingLevel, PbLevelType, PbTableWatermarks,
SstableInfo, StateTableInfo,
SstableInfo,
};
use tracing::warn;

Expand Down Expand Up @@ -254,17 +254,6 @@ impl HummockVersion {
}
}

pub type SstSplitInfo = (
// Object id.
HummockSstableObjectId,
// SST id.
HummockSstableId,
// Old SST id in parent group.
HummockSstableId,
// New SST id in parent group.
HummockSstableId,
);

impl HummockVersion {
pub fn count_new_ssts_in_group_split(
&self,
Expand Down Expand Up @@ -485,6 +474,11 @@ impl HummockVersion {
pub fn apply_version_delta(&mut self, version_delta: &HummockVersionDelta) {
assert_eq!(self.id, version_delta.prev_id);

let changed_table_info = self.state_table_info.apply_delta(
&version_delta.state_table_info_delta,
&version_delta.removed_table_ids,
);

// apply to `levels`, which is different compaction groups
for (compaction_group_id, group_deltas) in &version_delta.group_deltas {
let summary = summarize_group_deltas(group_deltas);
Expand Down Expand Up @@ -614,7 +608,7 @@ impl HummockVersion {
}
for (table_id, table_watermarks) in &self.table_watermarks {
if let Some(table_delta) = version_delta.state_table_info_delta.get(table_id)
&& let Some(prev_table) = self.state_table_info.get(table_id)
&& let Some(Some(prev_table)) = changed_table_info.get(table_id)
&& table_delta.safe_epoch > prev_table.safe_epoch
{
// safe epoch has progressed, need further clear.
Expand Down Expand Up @@ -667,7 +661,7 @@ impl HummockVersion {
return false;
}
if let Some(table_info_delta) = version_delta.state_table_info_delta.get(table_id)
&& let Some(prev_table_info) = self.state_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
&& let Some(Some(prev_table_info)) = changed_table_info.get(table_id) && table_info_delta.committed_epoch > prev_table_info.committed_epoch {
// the table exists previously, and its committed epoch has progressed.
} else {
// otherwise, the table change log should be kept anyway
Expand All @@ -690,17 +684,6 @@ impl HummockVersion {
change_log.truncate(change_log_delta.truncate_epoch);
}
}

// apply the state table info delta
for (table_id, delta) in &version_delta.state_table_info_delta {
self.state_table_info.insert(
*table_id,
StateTableInfo {
committed_epoch: delta.committed_epoch,
safe_epoch: delta.safe_epoch,
},
);
}
}

pub fn build_compaction_group_info(&self) -> HashMap<TableId, CompactionGroupId> {
Expand Down
96 changes: 82 additions & 14 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,98 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::mem::size_of;
use std::mem::{replace, size_of};
use std::sync::Arc;

use itertools::Itertools;
use prost::Message;
use risingwave_common::catalog::TableId;
use risingwave_pb::hummock::group_delta::DeltaType;
use risingwave_pb::hummock::hummock_version::Levels as PbLevels;
use risingwave_pb::hummock::hummock_version_delta::{ChangeLogDelta, GroupDeltas as PbGroupDeltas};
use risingwave_pb::hummock::{
HummockVersion as PbHummockVersion, HummockVersionDelta as PbHummockVersionDelta, SstableInfo,
StateTableInfo as PbStateTableInfo, StateTableInfoDelta,
StateTableInfo as PbStateTableInfo, StateTableInfo, StateTableInfoDelta,
};
use tracing::{info, warn};

use crate::change_log::TableChangeLog;
use crate::table_watermark::TableWatermarks;
use crate::{CompactionGroupId, HummockSstableObjectId, HummockVersionId};

#[derive(Debug, Clone, PartialEq)]
pub struct HummockVersionStateTableInfo {
state_table_info: HashMap<TableId, PbStateTableInfo>,
}

impl HummockVersionStateTableInfo {
pub fn empty() -> Self {
Self {
state_table_info: HashMap::new(),
}
}

pub fn from_protobuf(state_table_info: &HashMap<u32, PbStateTableInfo>) -> Self {
let state_table_info = state_table_info
.iter()
.map(|(table_id, info)| (TableId::new(*table_id), info.clone()))
.collect();
Self { state_table_info }
}

pub fn to_protobuf(&self) -> HashMap<u32, PbStateTableInfo> {
self.state_table_info
.iter()
.map(|(table_id, info)| (table_id.table_id, info.clone()))
.collect()
}

pub fn apply_delta(
&mut self,
delta: &HashMap<TableId, StateTableInfoDelta>,
removed_table_id: &HashSet<TableId>,
) -> HashMap<TableId, Option<StateTableInfo>> {
let mut changed_table = HashMap::new();
info!(removed_table_id = ?removed_table_id.iter().map(|table_id| table_id.table_id).collect_vec(), "removing table id");
for table_id in removed_table_id {
if let Some(prev_info) = self.state_table_info.remove(table_id) {
assert!(changed_table.insert(*table_id, Some(prev_info)).is_none());
} else {
warn!(
table_id = table_id.table_id,
"table to remove does not exist"
);
}
}
for (table_id, delta) in delta {
if removed_table_id.contains(table_id) {
continue;
}
let new_info = StateTableInfo {
committed_epoch: delta.committed_epoch,
safe_epoch: delta.safe_epoch,
};
match self.state_table_info.entry(*table_id) {
Entry::Occupied(mut entry) => {
let prev_info = replace(entry.get_mut(), new_info);
changed_table.insert(*table_id, Some(prev_info));
}
Entry::Vacant(entry) => {
entry.insert(new_info);
changed_table.insert(*table_id, None);
}
}
}
changed_table
}

pub fn info(&self) -> &HashMap<TableId, StateTableInfo> {
&self.state_table_info
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct HummockVersion {
pub id: u64,
Expand All @@ -38,7 +112,7 @@ pub struct HummockVersion {
pub safe_epoch: u64,
pub table_watermarks: HashMap<TableId, Arc<TableWatermarks>>,
pub table_change_log: HashMap<TableId, TableChangeLog>,
pub state_table_info: HashMap<TableId, PbStateTableInfo>,
pub state_table_info: HummockVersionStateTableInfo,
}

impl Default for HummockVersion {
Expand Down Expand Up @@ -90,11 +164,9 @@ impl HummockVersion {
)
})
.collect(),
state_table_info: pb_version
.state_table_info
.iter()
.map(|(table_id, info)| (TableId::new(*table_id), info.clone()))
.collect(),
state_table_info: HummockVersionStateTableInfo::from_protobuf(
&pb_version.state_table_info,
),
}
}

Expand All @@ -118,11 +190,7 @@ impl HummockVersion {
.iter()
.map(|(table_id, change_log)| (table_id.table_id, change_log.to_protobuf()))
.collect(),
state_table_info: self
.state_table_info
.iter()
.map(|(table_id, info)| (table_id.table_id(), info.clone()))
.collect(),
state_table_info: self.state_table_info.to_protobuf(),
}
}

Expand All @@ -147,7 +215,7 @@ impl HummockVersion {

pub fn need_fill_backward_compatible_state_table_info_delta(&self) -> bool {
// state_table_info is not previously filled, but there previously exists some tables
self.state_table_info.is_empty()
self.state_table_info.state_table_info.is_empty()
&& self
.levels
.values()
Expand Down
2 changes: 1 addition & 1 deletion src/storage/hummock_test/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub async fn prepare_first_valid_version(
};

(
PinnedVersion::new(hummock_version, unbounded_channel().0),
PinnedVersion::new(*hummock_version, unbounded_channel().0),
tx,
rx,
)
Expand Down
22 changes: 16 additions & 6 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ impl HummockEventHandler {

version_to_apply
}
HummockVersionUpdate::PinnedVersion(version) => version,
HummockVersionUpdate::PinnedVersion(version) => *version,
};

validate_table_key_range(&newly_pinned_version);
Expand Down Expand Up @@ -1127,7 +1127,9 @@ mod tests {
let version1 = make_new_version(epoch1);
{
version_update_tx
.send(HummockVersionUpdate::PinnedVersion(version1.clone()))
.send(HummockVersionUpdate::PinnedVersion(Box::new(
version1.clone(),
)))
.unwrap();
let (old_version, new_version, refill_finish_tx) = refill_task_rx.recv().await.unwrap();
assert_eq!(old_version.version(), initial_version.version());
Expand All @@ -1147,10 +1149,14 @@ mod tests {
let version3 = make_new_version(epoch3);
{
version_update_tx
.send(HummockVersionUpdate::PinnedVersion(version2.clone()))
.send(HummockVersionUpdate::PinnedVersion(Box::new(
version2.clone(),
)))
.unwrap();
version_update_tx
.send(HummockVersionUpdate::PinnedVersion(version3.clone()))
.send(HummockVersionUpdate::PinnedVersion(Box::new(
version3.clone(),
)))
.unwrap();
let (old_version2, new_version2, _refill_finish_tx2) =
refill_task_rx.recv().await.unwrap();
Expand Down Expand Up @@ -1180,11 +1186,15 @@ mod tests {
let mut rx = send_clear(epoch5);
assert_pending(&mut rx).await;
version_update_tx
.send(HummockVersionUpdate::PinnedVersion(version4.clone()))
.send(HummockVersionUpdate::PinnedVersion(Box::new(
version4.clone(),
)))
.unwrap();
assert_pending(&mut rx).await;
version_update_tx
.send(HummockVersionUpdate::PinnedVersion(version5.clone()))
.send(HummockVersionUpdate::PinnedVersion(Box::new(
version5.clone(),
)))
.unwrap();
rx.await.unwrap();
assert_eq!(latest_version.load().version(), &version5);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct BufferWriteRequest {
#[derive(Debug)]
pub enum HummockVersionUpdate {
VersionDeltas(Vec<HummockVersionDelta>),
PinnedVersion(HummockVersion),
PinnedVersion(Box<HummockVersion>),
}

pub enum HummockEvent {
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/hummock/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ impl ObserverState for HummockObserverNode {
);
let _ = self
.version_update_sender
.send(HummockVersionUpdate::PinnedVersion(
.send(HummockVersionUpdate::PinnedVersion(Box::new(
HummockVersion::from_rpc_protobuf(
&snapshot
.hummock_version
.expect("should get hummock version"),
),
))
)))
.inspect_err(|e| {
tracing::error!(event = ?e.0, "unable to send full version");
});
Expand Down
4 changes: 2 additions & 2 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl HummockStorage {
observer_manager.start().await;

let hummock_version = match version_update_rx.recv().await {
Some(HummockVersionUpdate::PinnedVersion(version)) => version,
Some(HummockVersionUpdate::PinnedVersion(version)) => *version,
_ => unreachable!("the hummock observer manager is the first one to take the event tx. Should be full hummock version")
};

Expand Down Expand Up @@ -653,7 +653,7 @@ impl HummockStorage {
use tokio::task::yield_now;
let version_id = version.id;
self._version_update_sender
.send(HummockVersionUpdate::PinnedVersion(version))
.send(HummockVersionUpdate::PinnedVersion(Box::new(version)))
.unwrap();
loop {
if self.pinned_version.load().id() >= version_id {
Expand Down

0 comments on commit ed2b4cc

Please sign in to comment.