Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): notify frontend with more hummock version info #18570

Merged
merged 4 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@
uint32 worker_id = 3;
}

message MetaSnapshot {

Check failure on line 412 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "11" with name "hummock_snapshot" on message "MetaSnapshot" was deleted without reserving the name "hummock_snapshot".

Check failure on line 412 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "11" with name "hummock_snapshot" on message "MetaSnapshot" was deleted without reserving the number "11".
message SnapshotVersion {
uint64 catalog_version = 1;
reserved 2;
Expand All @@ -433,7 +433,6 @@
GetSessionParamsResponse session_params = 20;
repeated catalog.Secret secrets = 23;
repeated common.WorkerNode nodes = 10;
hummock.HummockSnapshot hummock_snapshot = 11;
hummock.HummockVersion hummock_version = 12;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
hummock.WriteLimits hummock_write_limits = 16;
Expand Down Expand Up @@ -464,7 +463,7 @@

message Recovery {}

message SubscribeResponse {

Check failure on line 466 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "14" with name "hummock_snapshot" on message "SubscribeResponse" was deleted without reserving the name "hummock_snapshot".

Check failure on line 466 in proto/meta.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Previously present field "14" with name "hummock_snapshot" on message "SubscribeResponse" was deleted without reserving the number "14".
enum Operation {
UNSPECIFIED = 0;
ADD = 1;
Expand All @@ -485,7 +484,6 @@
user.UserInfo user = 11;
SetSessionParamRequest session_param = 26;
common.WorkerNode node = 13;
hummock.HummockSnapshot hummock_snapshot = 14;
hummock.HummockVersionDeltas hummock_version_deltas = 15;
MetaSnapshot snapshot = 16;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 17;
Expand Down
1 change: 0 additions & 1 deletion src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ where
Info::HummockVersionDeltas(version_delta) => {
version_delta.version_deltas[0].id > info.hummock_version.as_ref().unwrap().id
}
Info::HummockSnapshot(_) => true,
Info::MetaBackupManifestId(_) => true,
Info::SystemParams(_) | Info::SessionParam(_) => true,
Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(),
Expand Down
32 changes: 10 additions & 22 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use risingwave_common::secret::LocalSecretManager;
use risingwave_common::session_config::SessionConfig;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common_service::ObserverState;
use risingwave_hummock_sdk::FrontendHummockVersion;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::hummock::{HummockVersionDeltas, HummockVersionStats};
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::{FragmentWorkerSlotMapping, MetaSnapshot, SubscribeResponse};
Expand Down Expand Up @@ -85,11 +86,8 @@ impl ObserverState for FrontendObserverNode {
resp
)
}
Info::HummockSnapshot(_) => {
self.handle_hummock_snapshot_notification(resp);
}
Info::HummockVersionDeltas(_) => {
panic!("frontend node should not receive HummockVersionDeltas");
Info::HummockVersionDeltas(deltas) => {
self.handle_hummock_snapshot_notification(deltas);
}
Info::MetaBackupManifestId(_) => {
panic!("frontend node should not receive MetaBackupManifestId");
Expand Down Expand Up @@ -141,8 +139,7 @@ impl ObserverState for FrontendObserverNode {
connections,
users,
nodes,
hummock_snapshot,
hummock_version: _,
hummock_version,
meta_backup_manifest_id: _,
hummock_write_limits: _,
streaming_worker_slot_mappings,
Expand Down Expand Up @@ -195,7 +192,9 @@ impl ObserverState for FrontendObserverNode {
convert_worker_slot_mapping(&serving_worker_slot_mappings),
);
self.hummock_snapshot_manager
.update(hummock_snapshot.unwrap());
.init(FrontendHummockVersion::from_protobuf(
hummock_version.unwrap(),
));

let snapshot_version = version.unwrap();
catalog_guard.set_version(snapshot_version.catalog_version);
Expand Down Expand Up @@ -465,19 +464,8 @@ impl FrontendObserverNode {
}

/// Update max committed epoch in `HummockSnapshotManager`.
fn handle_hummock_snapshot_notification(&self, resp: SubscribeResponse) {
let Some(info) = resp.info.as_ref() else {
return;
};
match info {
Info::HummockSnapshot(hummock_snapshot) => match resp.operation() {
Operation::Update => {
self.hummock_snapshot_manager.update(*hummock_snapshot);
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
_ => unreachable!(),
}
fn handle_hummock_snapshot_notification(&self, deltas: HummockVersionDeltas) {
self.hummock_snapshot_manager.update(deltas);
}

fn handle_secret_notification(&mut self, resp: SubscribeResponse) {
Expand Down
132 changes: 80 additions & 52 deletions src/frontend/src/scheduler/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@

use std::assert_matches::assert_matches;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;

use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
use risingwave_common::must_match;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::{
FrontendHummockVersion, FrontendHummockVersionDelta, HummockVersionId, INVALID_VERSION_ID,
};
use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch};
use risingwave_pb::hummock::PbHummockSnapshot;
use risingwave_pb::hummock::{HummockVersionDeltas, PbHummockSnapshot};
use thiserror_ext::AsReport;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::watch;
Expand Down Expand Up @@ -85,7 +90,7 @@ impl ReadSnapshot {
// DO NOT implement `Clone` for `PinnedSnapshot` because it's a "resource" that should always be a
// singleton for each snapshot. Use `PinnedSnapshotRef` instead.
pub struct PinnedSnapshot {
value: PbHummockSnapshot,
value: FrontendHummockVersion,
unpin_sender: UnboundedSender<Operation>,
}

Expand All @@ -103,26 +108,29 @@ impl PinnedSnapshot {
let epoch = if is_barrier_read {
batch_query_epoch::Epoch::Current(u64::MAX)
} else {
batch_query_epoch::Epoch::Committed(self.value.committed_epoch)
batch_query_epoch::Epoch::Committed(self.value.max_committed_epoch)
};
BatchQueryEpoch { epoch: Some(epoch) }
}

pub fn committed_epoch(&self) -> u64 {
self.value.committed_epoch
self.value.max_committed_epoch
}
}

impl Drop for PinnedSnapshot {
fn drop(&mut self) {
let _ = self.unpin_sender.send(Operation::Unpin(self.value));
let _ = self.unpin_sender.send(Operation::Unpin(self.value.id));
}
}

/// Returns an invalid snapshot, used for initial values.
fn invalid_snapshot() -> PbHummockSnapshot {
PbHummockSnapshot {
committed_epoch: INVALID_EPOCH,
fn invalid_snapshot() -> FrontendHummockVersion {
FrontendHummockVersion {
id: INVALID_VERSION_ID,
max_committed_epoch: 0,
state_table_info: HummockVersionStateTableInfo::from_protobuf(&HashMap::new()),
table_change_log: Default::default(),
}
}

Expand Down Expand Up @@ -170,43 +178,60 @@ impl HummockSnapshotManager {
self.latest_snapshot.borrow().clone()
}

pub fn init(&self, version: FrontendHummockVersion) {
self.update_inner(|_| Some(version));
}

/// Update the latest snapshot.
///
/// Should only be called by the observer manager.
pub fn update(&self, snapshot: PbHummockSnapshot) {
pub fn update(&self, deltas: HummockVersionDeltas) {
self.update_inner(|old_snapshot| {
if deltas.version_deltas.is_empty() {
return None;
}
let mut snapshot = old_snapshot.clone();
for delta in deltas.version_deltas {
snapshot.apply_delta(FrontendHummockVersionDelta::from_protobuf(delta));
}
Some(snapshot)
})
}

fn update_inner(
&self,
get_new_snapshot: impl FnOnce(&FrontendHummockVersion) -> Option<FrontendHummockVersion>,
) {
self.latest_snapshot.send_if_modified(move |old_snapshot| {
// Note(bugen): theoretically, the snapshots from the observer should always be
// monotonically increasing, so there's no need to `max` them or check whether they are
// the same. But we still do it here to be safe.
// TODO: turn this into an assertion.
let snapshot = PbHummockSnapshot {
committed_epoch: std::cmp::max(
old_snapshot.value.committed_epoch,
snapshot.committed_epoch,
),
let new_snapshot = get_new_snapshot(&old_snapshot.value);
let Some(snapshot) = new_snapshot else {
return false;
};

if old_snapshot.value == snapshot {
// Ignore the same snapshot
false
} else {
// First tell the worker that a new snapshot is going to be pinned.
self.worker_sender.send(Operation::Pin(snapshot)).unwrap();
// Then set the latest snapshot.
*old_snapshot = Arc::new(PinnedSnapshot {
value: snapshot,
unpin_sender: self.worker_sender.clone(),
});

true
if snapshot.id <= old_snapshot.value.id {
assert_eq!(
snapshot.id, old_snapshot.value.id,
"receive stale frontend version"
);
return false;
}
// First tell the worker that a new snapshot is going to be pinned.
self.worker_sender
.send(Operation::Pin(snapshot.id, snapshot.max_committed_epoch))
.unwrap();
// Then set the latest snapshot.
*old_snapshot = Arc::new(PinnedSnapshot {
value: snapshot,
unpin_sender: self.worker_sender.clone(),
});

true
});
}

/// Wait until the latest snapshot is newer than the given one.
pub async fn wait(&self, snapshot: PbHummockSnapshot) {
let mut rx = self.latest_snapshot.subscribe();
while rx.borrow_and_update().value.committed_epoch < snapshot.committed_epoch {
while rx.borrow_and_update().value.max_committed_epoch < snapshot.committed_epoch {
rx.changed().await.unwrap();
}
}
Expand All @@ -216,7 +241,7 @@ impl HummockSnapshotManager {
#[derive(Debug)]
enum PinState {
/// The snapshot is currently pinned by some sessions in this frontend.
Pinned,
Pinned(u64),

/// The snapshot is no longer pinned by any session in this frontend, but it's still considered
/// to be pinned by the meta service. It will be unpinned by the [`UnpinWorker`] in the next
Expand All @@ -228,34 +253,32 @@ enum PinState {
#[derive(Debug)]
enum Operation {
/// Mark the snapshot as pinned, sent when a new snapshot is pinned with `update`.
Pin(PbHummockSnapshot),
Pin(HummockVersionId, u64),

/// Mark the snapshot as unpinned, sent when all references to a [`PinnedSnapshot`] is dropped.
Unpin(PbHummockSnapshot),
Unpin(HummockVersionId),
}

impl Operation {
/// Returns whether the operation is for an invalid snapshot, which should be ignored.
fn is_invalid(&self) -> bool {
match self {
Operation::Pin(s) | Operation::Unpin(s) => s,
}
.committed_epoch
== INVALID_EPOCH
*match self {
Operation::Pin(id, _) | Operation::Unpin(id) => id,
} == INVALID_VERSION_ID
}
}

/// The key for the states map in [`UnpinWorker`].
///
/// The snapshot will be first sorted by `committed_epoch`, then by `current_epoch`.
#[derive(Debug, PartialEq, Clone)]
struct SnapshotKey(PbHummockSnapshot);
struct SnapshotKey(HummockVersionId);

impl Eq for SnapshotKey {}

impl Ord for SnapshotKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.committed_epoch.cmp(&other.0.committed_epoch)
self.0.to_u64().cmp(&other.0.to_u64())
}
}

Expand Down Expand Up @@ -319,15 +342,15 @@ impl UnpinWorker {
}

match operation {
Operation::Pin(snapshot) => {
Operation::Pin(version_id, committed_epoch) => {
self.states
.try_insert(SnapshotKey(snapshot), PinState::Pinned)
.try_insert(SnapshotKey(version_id), PinState::Pinned(committed_epoch))
.unwrap();
}
Operation::Unpin(snapshot) => match self.states.entry(SnapshotKey(snapshot)) {
Entry::Vacant(_v) => unreachable!("unpin a snapshot that is not pinned"),
Entry::Occupied(o) => {
assert_matches!(o.get(), PinState::Pinned);
assert_matches!(o.get(), PinState::Pinned(_));
*o.into_mut() = PinState::Unpinned;
}
},
Expand All @@ -338,18 +361,23 @@ impl UnpinWorker {
/// and clean up their entries.
async fn unpin_batch(&mut self) {
// Find the minimum snapshot that is pinned. Unpin all snapshots before it.
if let Some(min_snapshot) = self
if let Some((min_snapshot, min_committed_epoch)) = self
.states
.iter()
.find(|(_, s)| matches!(s, PinState::Pinned))
.map(|(k, _)| k.clone())
.find(|(_, s)| matches!(s, PinState::Pinned(_)))
.map(|(k, s)| {
(
k.clone(),
must_match!(s, PinState::Pinned(committed_epoch) => *committed_epoch),
)
})
{
if &min_snapshot == self.states.first_key_value().unwrap().0 {
// Nothing to unpin.
return;
}

let min_epoch = min_snapshot.0.committed_epoch;
let min_epoch = min_committed_epoch;

match self.meta_client.unpin_snapshot_before(min_epoch).await {
Ok(()) => {
Expand Down
10 changes: 8 additions & 2 deletions src/meta/service/src/notification_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use anyhow::{anyhow, Context};
use itertools::Itertools;
use risingwave_common::secret::{LocalSecretManager, SecretEncryption};
use risingwave_hummock_sdk::FrontendHummockVersion;
use risingwave_meta::manager::{MetadataManager, SessionParamsManagerImpl};
use risingwave_meta::MetaResult;
use risingwave_pb::backup_service::MetaBackupManifestId;
Expand Down Expand Up @@ -305,7 +306,12 @@ impl NotificationServiceImpl {

let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?;

let hummock_snapshot = Some(self.hummock_manager.latest_snapshot());
let hummock_version = self
.hummock_manager
.on_current_version(|version| {
FrontendHummockVersion::from_version(version).to_protobuf()
})
.await;

let session_params = match self.env.session_params_manager_impl_ref() {
SessionParamsManagerImpl::Kv(manager) => manager.get_params().await,
Expand All @@ -331,7 +337,7 @@ impl NotificationServiceImpl {
secrets: decrypted_secrets,
users,
nodes,
hummock_snapshot,
hummock_version: Some(hummock_version),
version: Some(SnapshotVersion {
catalog_version,
worker_node_version,
Expand Down
Loading
Loading