Skip to content

Commit

Permalink
feat(storage): define and collect table watermark metadata (#13710)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Dec 7, 2023
1 parent 9b0d25c commit 9a3cfb5
Show file tree
Hide file tree
Showing 15 changed files with 816 additions and 41 deletions.
23 changes: 23 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,27 @@ message UncommittedEpoch {
repeated SstableInfo tables = 2;
}

message VnodeWatermark {
// The watermark shared by multiple vnodes
bytes watermark = 1;
// The set of vnodes that share the same watermark
common.Buffer vnode_bitmap = 2;
}

message TableWatermarks {
message EpochNewWatermarks {
repeated VnodeWatermark watermarks = 1;
uint64 epoch = 2;
}

// Table watermarks of a state table from all vnodes written in multiple epochs.
// Epochs should be sorted in ascending order, which means earlier epoch at the front
repeated EpochNewWatermarks epoch_watermarks = 1;

// The direction of the table watermark.
bool is_ascending = 2;
}

message HummockVersion {
message Levels {
repeated Level levels = 1;
Expand All @@ -124,6 +145,7 @@ message HummockVersion {
// Snapshots with epoch less than the safe epoch have been GCed.
// Reads against such an epoch will fail.
uint64 safe_epoch = 4;
map<uint64, TableWatermarks> table_watermarks = 5;
}

message HummockVersionDelta {
Expand All @@ -140,6 +162,7 @@ message HummockVersionDelta {
uint64 safe_epoch = 5;
bool trivial_move = 6;
repeated uint64 gc_object_ids = 7;
map<uint64, TableWatermarks> new_table_watermarks = 8;
}

message HummockVersionDeltas {
Expand Down
1 change: 1 addition & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ message BarrierCompleteResponse {
}
repeated GroupedSstableInfo synced_sstables = 4;
uint32 worker_id = 5;
map<uint64, hummock.TableWatermarks> table_watermarks = 6;
}

// Before starting streaming, the leader node broadcast the actor-host table to needed workers.
Expand Down
17 changes: 12 additions & 5 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use await_tree::InstrumentAwait;
Expand Down Expand Up @@ -191,7 +192,7 @@ impl StreamService for StreamServiceImpl {
|err| tracing::error!(error = %err.as_report(), "failed to collect barrier"),
)?;

let synced_sstables = match kind {
let (synced_sstables, table_watermarks) = match kind {
BarrierKind::Unspecified => unreachable!(),
BarrierKind::Initial => {
if let Some(hummock) = self.env.state_store().as_hummock() {
Expand All @@ -205,21 +206,23 @@ impl StreamService for StreamServiceImpl {
epoch = req.prev_epoch,
"ignored syncing data for the first barrier"
);
Vec::new()
(Vec::new(), HashMap::new())
}
BarrierKind::Barrier => Vec::new(),
BarrierKind::Barrier => (Vec::new(), HashMap::new()),
BarrierKind::Checkpoint => {
let span = TracingContext::from_protobuf(&req.tracing_context).attach(
tracing::info_span!("sync_epoch", prev_epoch = req.prev_epoch),
);

// Must finish syncing data written in the epoch before respond back to ensure
// persistence of the state.
self.mgr
let sync_result = self
.mgr
.sync_epoch(req.prev_epoch)
.instrument(span)
.instrument_await(format!("sync_epoch (epoch {})", req.prev_epoch))
.await?
.await?;
(sync_result.uncommitted_ssts, sync_result.table_watermarks)
}
};

Expand All @@ -242,6 +245,10 @@ impl StreamService for StreamServiceImpl {
)
.collect_vec(),
worker_id: self.env.worker_id(),
table_watermarks: table_watermarks
.into_iter()
.map(|(key, value)| (key.table_id as u64, value.to_protobuf()))
.collect(),
}))
}

Expand Down
7 changes: 6 additions & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_common::bail;
use risingwave_common::catalog::TableId;
use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY;
use risingwave_common::util::tracing::TracingContext;
use risingwave_hummock_sdk::table_watermark::merge_multiple_new_table_watermarks;
use risingwave_hummock_sdk::{ExtendedSstableInfo, HummockSstableObjectId};
use risingwave_pb::catalog::table::TableType;
use risingwave_pb::ddl_service::DdlProgress;
Expand Down Expand Up @@ -1255,5 +1256,9 @@ fn collect_commit_epoch_info(resps: &mut [BarrierCompleteResponse]) -> CommitEpo
.collect_vec();
synced_ssts.append(&mut t);
}
CommitEpochInfo::new(synced_ssts, sst_to_worker)
CommitEpochInfo::new(
synced_ssts,
merge_multiple_new_table_watermarks(resps.iter().map(|resp| resp.table_watermarks.clone())),
sst_to_worker,
)
}
8 changes: 7 additions & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use risingwave_pb::hummock::{
version_update_payload, CompactTask, CompactTaskAssignment, CompactionConfig, GroupDelta,
HummockPinnedSnapshot, HummockPinnedVersion, HummockSnapshot, HummockVersion,
HummockVersionCheckpoint, HummockVersionDelta, HummockVersionDeltas, HummockVersionStats,
IntraLevelDelta, SstableInfo, SubscribeCompactionEventRequest, TableOption,
IntraLevelDelta, SstableInfo, SubscribeCompactionEventRequest, TableOption, TableWatermarks,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
Expand Down Expand Up @@ -268,16 +268,19 @@ pub enum CompactionResumeTrigger {

pub struct CommitEpochInfo {
pub sstables: Vec<ExtendedSstableInfo>,
pub new_table_watermarks: HashMap<u64, TableWatermarks>,
pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
}

impl CommitEpochInfo {
pub fn new(
sstables: Vec<ExtendedSstableInfo>,
new_table_watermarks: HashMap<u64, TableWatermarks>,
sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
) -> Self {
Self {
sstables,
new_table_watermarks,
sst_to_context,
}
}
Expand All @@ -289,6 +292,7 @@ impl CommitEpochInfo {
) -> Self {
Self::new(
sstables.into_iter().map(Into::into).collect(),
HashMap::new(),
sst_to_context,
)
}
Expand Down Expand Up @@ -1413,6 +1417,7 @@ impl HummockManager {
) -> Result<Option<HummockSnapshot>> {
let CommitEpochInfo {
mut sstables,
new_table_watermarks,
sst_to_context,
} = commit_info;
let mut versioning_guard = write_lock!(self, versioning).await;
Expand Down Expand Up @@ -1444,6 +1449,7 @@ impl HummockManager {
build_version_delta_after_version(old_version),
);
new_version_delta.max_committed_epoch = epoch;
new_version_delta.new_table_watermarks = new_table_watermarks;
let mut new_hummock_version = old_version.clone();
new_hummock_version.id = new_version_delta.id;
let mut incorrect_ssts = vec![];
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ pub(super) fn create_init_version(default_compaction_config: CompactionConfig) -
levels: Default::default(),
max_committed_epoch: INVALID_EPOCH,
safe_epoch: INVALID_EPOCH,
table_watermarks: HashMap::new(),
};
for group_id in [
StaticCompactionGroupId::StateDefault as CompactionGroupId,
Expand Down Expand Up @@ -560,6 +561,7 @@ mod tests {
levels: Default::default(),
max_committed_epoch: 0,
safe_epoch: 0,
table_watermarks: HashMap::new(),
};
for cg in 1..3 {
version.levels.insert(
Expand Down
27 changes: 26 additions & 1 deletion src/meta/src/hummock/mock_hummock_meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::SystemTime;
Expand All @@ -32,7 +33,7 @@ use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTa
use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent;
use risingwave_pb::hummock::{
compact_task, CompactTask, HummockSnapshot, HummockVersion, SubscribeCompactionEventRequest,
SubscribeCompactionEventResponse, VacuumTask,
SubscribeCompactionEventResponse, TableWatermarks, VacuumTask,
};
use risingwave_rpc_client::error::{Result, RpcError};
use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient};
Expand Down Expand Up @@ -88,6 +89,30 @@ impl MockHummockMetaClient {
.await
.unwrap_or(None)
}

pub async fn commit_epoch_with_watermark(
&self,
epoch: HummockEpoch,
sstables: Vec<LocalSstableInfo>,
new_table_watermarks: HashMap<u64, TableWatermarks>,
) -> Result<()> {
let sst_to_worker = sstables
.iter()
.map(|LocalSstableInfo { sst_info, .. }| (sst_info.get_object_id(), self.context_id))
.collect();
self.hummock_manager
.commit_epoch(
epoch,
CommitEpochInfo::new(
sstables.into_iter().map(Into::into).collect(),
new_table_watermarks,
sst_to_worker,
),
)
.await
.map_err(mock_err)?;
Ok(())
}
}

fn mock_err(error: super::error::Error) -> RpcError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use super::StateTableId;
use crate::compaction_group::StaticCompactionGroupId;
use crate::key_range::KeyRangeCommon;
use crate::prost_key_range::KeyRangeExt;
use crate::table_watermark::PbTableWatermarksExt;
use crate::{can_concat, CompactionGroupId, HummockSstableId, HummockSstableObjectId};

pub struct GroupDeltasSummary {
Expand Down Expand Up @@ -530,7 +531,25 @@ impl HummockVersion {
}
self.id = version_delta.id;
self.max_committed_epoch = version_delta.max_committed_epoch;
self.safe_epoch = version_delta.safe_epoch;
for (table_id, table_watermarks) in &version_delta.new_table_watermarks {
match self.table_watermarks.entry(*table_id) {
Entry::Occupied(mut entry) => {
entry.get_mut().apply_new_table_watermarks(table_watermarks);
}
Entry::Vacant(entry) => {
entry.insert(table_watermarks.clone());
}
}
}
if version_delta.safe_epoch != self.safe_epoch {
assert!(version_delta.safe_epoch > self.safe_epoch);
self.table_watermarks
.values_mut()
.for_each(|table_watermarks| {
table_watermarks.clear_stale_epoch_watermark(version_delta.safe_epoch)
});
self.safe_epoch = version_delta.safe_epoch;
}
sst_split_info
}

Expand Down Expand Up @@ -949,6 +968,7 @@ pub fn build_version_delta_after_version(version: &HummockVersion) -> HummockVer
max_committed_epoch: version.max_committed_epoch,
group_deltas: Default::default(),
gc_object_ids: vec![],
new_table_watermarks: HashMap::new(),
}
}

Expand Down Expand Up @@ -1197,6 +1217,7 @@ mod tests {
)]),
max_committed_epoch: 0,
safe_epoch: 0,
table_watermarks: HashMap::new(),
};
assert_eq!(version.get_object_ids().len(), 0);

Expand Down Expand Up @@ -1259,6 +1280,7 @@ mod tests {
]),
max_committed_epoch: 0,
safe_epoch: 0,
table_watermarks: HashMap::new(),
};
let version_delta = HummockVersionDelta {
id: 1,
Expand Down Expand Up @@ -1341,6 +1363,7 @@ mod tests {
]),
max_committed_epoch: 0,
safe_epoch: 0,
table_watermarks: HashMap::new(),
}
);
}
Expand Down
2 changes: 2 additions & 0 deletions src/storage/hummock_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#![feature(type_alias_impl_trait)]
#![feature(impl_trait_in_assoc_type)]
#![feature(is_sorted)]
#![feature(let_chains)]
#![feature(btree_cursors)]
#![feature(split_array)]

mod key_cmp;
Expand Down
Loading

0 comments on commit 9a3cfb5

Please sign in to comment.