Skip to content

Commit

Permalink
per table snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed May 28, 2024
1 parent e57ed35 commit 37df492
Show file tree
Hide file tree
Showing 20 changed files with 468 additions and 654 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 12 additions & 28 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,32 +137,6 @@ message TableWatermarks {
bool is_ascending = 2;
}

message SnapshotGroup {
uint32 table_fragments_id = 1;
repeated uint32 member_table_ids = 2;
uint64 committed_epoch = 3;
// Snapshots with epoch less than the safe epoch have been GCed.
// Reads against such an epoch will fail.
uint64 safe_epoch = 4;
}

message SnapshotGroupDelta {
message NewSnapshotGroup {
repeated uint32 member_table_ids = 1;
uint64 committed_epoch = 2;
uint64 safe_epoch = 3;
}

message DestroySnapshotGroup {}

oneof delta {
NewSnapshotGroup new_snapshot_group = 1;
uint64 new_committed_epoch = 2;
uint64 new_safe_epoch = 3;
DestroySnapshotGroup destroy = 4;
}
}

message EpochNewChangeLog {
repeated SstableInfo old_value = 1;
repeated SstableInfo new_value = 2;
Expand All @@ -175,6 +149,16 @@ message TableChangeLog {
repeated EpochNewChangeLog change_logs = 1;
}

message StateTableInfo {
uint64 committed_epoch = 1;
uint64 safe_epoch = 2;
}

message StateTableInfoDelta {
uint64 committed_epoch = 1;
uint64 safe_epoch = 2;
}

message HummockVersion {
message Levels {
repeated Level levels = 1;
Expand All @@ -192,7 +176,7 @@ message HummockVersion {
uint64 safe_epoch = 4;
map<uint32, TableWatermarks> table_watermarks = 5;
map<uint32, TableChangeLog> table_change_logs = 6;
map<uint32, SnapshotGroup> snapshot_groups = 7;
map<uint32, StateTableInfo> state_table_info = 7;
}

message HummockVersionDelta {
Expand All @@ -218,7 +202,7 @@ message HummockVersionDelta {
uint64 truncate_epoch = 2;
}
map<uint32, ChangeLogDelta> change_log_delta = 10;
map<uint32, SnapshotGroupDelta> snapshot_group_delta = 11;
map<uint32, StateTableInfoDelta> state_table_info_delta = 11;
}

message HummockVersionDeltas {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,31 +206,25 @@ async fn read_hummock_table_watermarks(
}

#[derive(Fields)]
struct RwHummockSnapshotGroups {
struct RwHummockSnapshot {
#[primary_key]
group_id: i32,
table_id: i32,
safe_epoch: i64,
committed_epoch: i64,
member_table_ids: Vec<i32>,
}

#[system_catalog(table, "rw_catalog.rw_hummock_snapshot_groups")]
#[system_catalog(table, "rw_catalog.rw_hummock_snapshot")]
async fn read_hummock_snapshot_groups(
reader: &SysCatalogReaderImpl,
) -> Result<Vec<RwHummockSnapshotGroups>> {
) -> Result<Vec<RwHummockSnapshot>> {
let version = reader.meta_client.get_hummock_current_version().await?;
Ok(version
.snapshot_groups
.state_table_info
.iter()
.map(|(group_id, group)| RwHummockSnapshotGroups {
group_id: u32::from(*group_id) as _,
safe_epoch: group.safe_epoch as _,
committed_epoch: group.committed_epoch as _,
member_table_ids: group
.member_table_ids
.iter()
.map(|table_id| table_id.table_id as _)
.collect(),
.map(|(table_id, info)| RwHummockSnapshot {
table_id: table_id.table_id as _,
committed_epoch: info.committed_epoch as _,
safe_epoch: info.safe_epoch as _,
})
.collect())
}
7 changes: 1 addition & 6 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,13 +587,8 @@ pub async fn start_service_as_election_leader(
.unwrap(),
);

let existing_table_fragment_state_tables = metadata_manager
.get_table_fragment_state_table_ids()
.await
.unwrap();

hummock_manager
.may_fill_backward_snapshot_group(&existing_table_fragment_state_tables)
.may_fill_backward_state_table_info()
.await
.unwrap();

Expand Down
15 changes: 9 additions & 6 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::iter::once;
use std::sync::Arc;

use futures::future::try_join_all;
Expand Down Expand Up @@ -868,15 +867,19 @@ impl CommandContext {

Command::DropStreamingJobs {
actors,
unregistered_table_fragment_ids,
unregistered_state_table_ids,
..
} => {
// Tell compute nodes to drop actors.
self.clean_up(actors.clone()).await?;

let unregistered_state_table_ids = unregistered_state_table_ids
.iter()
.map(|table_id| table_id.table_id)
.collect_vec();
self.barrier_manager_context
.hummock_manager
.unregister_table_fragments_ids(unregistered_table_fragment_ids.clone())
.unregister_table_ids(&unregistered_state_table_ids)
.await?;
}

Expand All @@ -894,11 +897,11 @@ impl CommandContext {
// since the failure could be recoverable.
// As such it needs to be handled here.
let table_id = table_fragments.table_id().table_id;
let mut table_ids = table_fragments.internal_table_ids();
table_ids.push(table_id);
self.barrier_manager_context
.hummock_manager
.unregister_table_fragments_ids(HashSet::from_iter(once(
table_fragments.table_id(),
)))
.unregister_table_ids(&table_ids)
.await?;

match &self.barrier_manager_context.metadata_manager {
Expand Down
45 changes: 28 additions & 17 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,23 @@ impl GlobalBarrierManagerContext {
}

async fn purge_state_table_from_hummock(&self) -> MetaResult<()> {
let existing_table_fragment_state_tables = self
.metadata_manager
.get_table_fragment_state_table_ids()
.await?;
self.hummock_manager
.purge(HashSet::from_iter(
existing_table_fragment_state_tables
.keys()
.cloned()
.map(TableId::new),
))
.await?;
let all_state_table_ids = match &self.metadata_manager {
MetadataManager::V1(mgr) => mgr
.catalog_manager
.list_tables()
.await
.into_iter()
.map(|t| t.id)
.collect_vec(),
MetadataManager::V2(mgr) => mgr
.catalog_controller
.list_all_state_table_ids()
.await?
.into_iter()
.map(|id| id as u32)
.collect_vec(),
};
self.hummock_manager.purge(&all_state_table_ids).await?;
Ok(())
}

Expand Down Expand Up @@ -308,23 +313,29 @@ impl GlobalBarrierManagerContext {
let (dropped_actors, cancelled) = scheduled_barriers.pre_apply_drop_cancel_scheduled();
let applied = !dropped_actors.is_empty() || !cancelled.is_empty();
if !cancelled.is_empty() {
match &self.metadata_manager {
let unregister_table_ids = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.drop_table_fragments_vec(&cancelled)
.await?;
.await?
}
MetadataManager::V2(mgr) => {
for job_id in &cancelled {
let _ = mgr
let mut unregister_table_ids = Vec::new();
for job_id in cancelled {
let (_, table_ids_to_unregister) = mgr
.catalog_controller
.try_abort_creating_streaming_job(job_id.table_id as _, true)
.await?;
unregister_table_ids.extend(table_ids_to_unregister);
}
unregister_table_ids
.into_iter()
.map(|table_id| table_id as u32)
.collect()
}
};
self.hummock_manager
.unregister_table_fragments_ids(HashSet::from_iter(cancelled))
.unregister_table_ids(&unregister_table_ids)
.await?;
}
Ok(applied)
Expand Down
2 changes: 0 additions & 2 deletions src/meta/src/hummock/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ pub enum Error {
CompactorUnreachable(HummockContextId),
#[error("compaction group error: {0}")]
CompactionGroup(String),
#[error("snapshot group error: {0}")]
SnapshotGroup(String),
#[error("SST {0} is invalid")]
InvalidSst(HummockSstableObjectId),
#[error(transparent)]
Expand Down
Loading

0 comments on commit 37df492

Please sign in to comment.