Skip to content

Commit

Permalink
Showing 23 changed files with 871 additions and 559 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
@@ -2242,6 +2242,23 @@ def section_hummock_write(outer_panels):
"uploading task size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('state_store_uploader_imm_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"uploader imm size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('state_store_uploader_imm_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL}) - "
f"sum({metric('state_store_uploader_uploading_task_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"unflushed imm size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('uploading_memory_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL}) - "
f"sum({metric('state_store_uploader_imm_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"orphan imm size - {{%s}} @ {{%s}}"
% (COMPONENT_LABEL, NODE_LABEL),
),
panels.target(
f"sum({metric('state_store_old_value_size')}) by ({COMPONENT_LABEL}, {NODE_LABEL})",
"old value size - {{%s}} @ {{%s}}"
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
@@ -621,6 +621,11 @@ pub struct StorageConfig {
#[serde(default = "default::storage::shared_buffer_flush_ratio")]
pub shared_buffer_flush_ratio: f32,

/// The minimum total flush size of shared buffer spill. When a shared buffer spilled is trigger,
/// the total flush size across multiple epochs should be at least higher than this size.
#[serde(default = "default::storage::shared_buffer_min_batch_flush_size_mb")]
pub shared_buffer_min_batch_flush_size_mb: usize,

/// The threshold for the number of immutable memtables to merge to a new imm.
#[serde(default = "default::storage::imm_merge_threshold")]
#[deprecated]
@@ -1322,6 +1327,10 @@ pub mod default {
0.8
}

pub fn shared_buffer_min_batch_flush_size_mb() -> usize {
800
}

pub fn imm_merge_threshold() -> usize {
0 // disable
}
1 change: 1 addition & 0 deletions src/config/docs.md
Original file line number Diff line number Diff line change
@@ -130,6 +130,7 @@ This page is automatically generated by `./risedev generate-example-config`
| share_buffers_sync_parallelism | parallelism while syncing share buffers into L0 SST. Should NOT be 0. | 1 |
| shared_buffer_capacity_mb | Maximum shared buffer size, writes attempting to exceed the capacity will stall until there is enough space. | |
| shared_buffer_flush_ratio | The shared buffer will start flushing data to object when the ratio of memory usage to the shared buffer capacity exceed such ratio. | 0.800000011920929 |
| shared_buffer_min_batch_flush_size_mb | The minimum total flush size of shared buffer spill. When a shared buffer spilled is trigger, the total flush size across multiple epochs should be at least higher than this size. | 800 |
| sstable_id_remote_fetch_number | Number of SST ids fetched from meta per RPC | 10 |
| write_conflict_detection_enabled | Whether to enable write conflict detection | true |

1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
@@ -117,6 +117,7 @@ stream_high_join_amplification_threshold = 2048
share_buffers_sync_parallelism = 1
share_buffer_compaction_worker_threads_number = 4
shared_buffer_flush_ratio = 0.800000011920929
shared_buffer_min_batch_flush_size_mb = 800
imm_merge_threshold = 0
write_conflict_detection_enabled = true
max_prefetch_block_number = 16
4 changes: 2 additions & 2 deletions src/storage/benches/bench_imm_compact.rs
Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ fn criterion_benchmark(c: &mut Criterion) {
&batches,
|b, batches| {
b.to_async(FuturesExecutor).iter(|| async {
let imm = merge_imms_in_memory(TableId::default(), 0, batches.clone(), None).await;
let imm = merge_imms_in_memory(TableId::default(), batches.clone(), None).await;
assert_eq!(imm.key_count(), 10000 * 100);
assert_eq!(imm.value_count(), 10000 * 100);
})
@@ -72,7 +72,7 @@ fn criterion_benchmark(c: &mut Criterion) {
&later_batches,
|b, batches| {
b.to_async(FuturesExecutor).iter(|| async {
let imm = merge_imms_in_memory(TableId::default(), 0, batches.clone(), None).await;
let imm = merge_imms_in_memory(TableId::default(), batches.clone(), None).await;
assert_eq!(imm.key_count(), 2000 * 100);
assert_eq!(imm.value_count(), 2000 * 100 * 5);
})
13 changes: 5 additions & 8 deletions src/storage/src/hummock/compactor/shared_buffer_compact.rs
Original file line number Diff line number Diff line change
@@ -38,8 +38,7 @@ use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorMana
use crate::hummock::compactor::compaction_filter::DummyCompactionFilter;
use crate::hummock::compactor::context::{await_tree_key, CompactorContext};
use crate::hummock::compactor::{check_flush_result, CompactOutput, Compactor};
use crate::hummock::event_handler::uploader::{UploadTaskOutput, UploadTaskPayload};
use crate::hummock::event_handler::LocalInstanceId;
use crate::hummock::event_handler::uploader::UploadTaskOutput;
use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator, UserIterator};
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchInner, SharedBufferBatchOldValues, SharedBufferKeyEntry,
@@ -60,11 +59,11 @@ const GC_WATERMARK_FOR_FLUSH: u64 = 0;
pub async fn compact(
context: CompactorContext,
sstable_object_id_manager: SstableObjectIdManagerRef,
payload: UploadTaskPayload,
payload: Vec<ImmutableMemtable>,
compaction_group_index: Arc<HashMap<TableId, CompactionGroupId>>,
filter_key_extractor_manager: FilterKeyExtractorManager,
) -> HummockResult<UploadTaskOutput> {
let mut grouped_payload: HashMap<CompactionGroupId, UploadTaskPayload> = HashMap::new();
let mut grouped_payload: HashMap<CompactionGroupId, Vec<ImmutableMemtable>> = HashMap::new();
for imm in &payload {
let compaction_group_id = match compaction_group_index.get(&imm.table_id) {
// compaction group id is used only as a hint for grouping different data.
@@ -145,7 +144,7 @@ async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
context: CompactorContext,
sstable_object_id_manager: SstableObjectIdManagerRef,
filter_key_extractor_manager: FilterKeyExtractorManager,
mut payload: UploadTaskPayload,
mut payload: Vec<ImmutableMemtable>,
) -> HummockResult<Vec<LocalSstableInfo>> {
if !IS_NEW_VALUE {
assert!(payload.iter().all(|imm| imm.has_old_value()));
@@ -330,7 +329,6 @@ async fn compact_shared_buffer<const IS_NEW_VALUE: bool>(
/// Merge multiple batches into a larger one
pub async fn merge_imms_in_memory(
table_id: TableId,
instance_id: LocalInstanceId,
imms: Vec<ImmutableMemtable>,
memory_tracker: Option<MemoryTracker>,
) -> ImmutableMemtable {
@@ -465,13 +463,12 @@ pub async fn merge_imms_in_memory(
memory_tracker,
)),
table_id,
instance_id,
}
}

/// Based on the incoming payload and opts, calculate the sharding method and sstable size of shared buffer compaction.
fn generate_splits(
payload: &UploadTaskPayload,
payload: &Vec<ImmutableMemtable>,
existing_table_ids: &HashSet<u32>,
storage_opts: &StorageOpts,
) -> (Vec<KeyRange>, u64, u32) {
392 changes: 215 additions & 177 deletions src/storage/src/hummock/event_handler/hummock_event_handler.rs

Large diffs are not rendered by default.

15 changes: 9 additions & 6 deletions src/storage/src/hummock/event_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ use risingwave_hummock_sdk::{HummockEpoch, SyncResult};
use thiserror_ext::AsReport;
use tokio::sync::oneshot;

use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;
use crate::hummock::shared_buffer::shared_buffer_batch::{SharedBufferBatch, SharedBufferBatchId};
use crate::hummock::HummockResult;
use crate::mem_table::ImmutableMemtable;
use crate::store::SealCurrentEpochOptions;
@@ -57,7 +57,7 @@ pub enum HummockEvent {
/// An epoch is going to be synced. Once the event is processed, there will be no more flush
/// task on this epoch. Previous concurrent flush task join handle will be returned by the join
/// handle sender.
AwaitSyncEpoch {
SyncEpoch {
new_sync_epoch: HummockEpoch,
sync_result_sender: oneshot::Sender<HummockResult<SyncResult>>,
},
@@ -67,7 +67,10 @@ pub enum HummockEvent {

Shutdown,

ImmToUploader(ImmutableMemtable),
ImmToUploader {
instance_id: SharedBufferBatchId,
imm: ImmutableMemtable,
},

SealEpoch {
epoch: HummockEpoch,
@@ -104,7 +107,7 @@ impl HummockEvent {
match self {
HummockEvent::BufferMayFlush => "BufferMayFlush".to_string(),

HummockEvent::AwaitSyncEpoch {
HummockEvent::SyncEpoch {
new_sync_epoch,
sync_result_sender: _,
} => format!("AwaitSyncEpoch epoch {} ", new_sync_epoch),
@@ -113,8 +116,8 @@ impl HummockEvent {

HummockEvent::Shutdown => "Shutdown".to_string(),

HummockEvent::ImmToUploader(imm) => {
format!("ImmToUploader {:?}", imm)
HummockEvent::ImmToUploader { instance_id, imm } => {
format!("ImmToUploader {} {}", instance_id, imm.batch_id())
}

HummockEvent::SealEpoch {
767 changes: 493 additions & 274 deletions src/storage/src/hummock/event_handler/uploader.rs

Large diffs are not rendered by default.

11 changes: 2 additions & 9 deletions src/storage/src/hummock/shared_buffer/shared_buffer_batch.rs
Original file line number Diff line number Diff line change
@@ -30,7 +30,6 @@ use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::{FullKey, PointRange, TableKey, TableKeyRange, UserKey};
use risingwave_hummock_sdk::EpochWithGap;

use crate::hummock::event_handler::LocalInstanceId;
use crate::hummock::iterator::{
Backward, DeleteRangeIterator, DirectionEnum, Forward, HummockIterator,
HummockIteratorDirection, ValueMeta,
@@ -285,7 +284,6 @@ pub static SHARED_BUFFER_BATCH_ID_GENERATOR: LazyLock<AtomicU64> =
pub struct SharedBufferBatch {
pub(crate) inner: Arc<SharedBufferBatchInner>,
pub table_id: TableId,
pub instance_id: LocalInstanceId,
}

impl SharedBufferBatch {
@@ -327,7 +325,6 @@ impl SharedBufferBatch {
None,
)),
table_id,
instance_id: SHARED_BUFFER_BATCH_ID_GENERATOR.fetch_add(1, Relaxed),
}
}

@@ -519,7 +516,6 @@ impl SharedBufferBatch {
old_values: Option<SharedBufferBatchOldValues>,
size: usize,
table_id: TableId,
instance_id: LocalInstanceId,
tracker: Option<MemoryTracker>,
) -> Self {
let inner = SharedBufferBatchInner::new(
@@ -533,7 +529,6 @@ impl SharedBufferBatch {
SharedBufferBatch {
inner: Arc::new(inner),
table_id,
instance_id,
}
}

@@ -578,11 +573,9 @@ impl SharedBufferBatch {
) -> Self {
let inner =
SharedBufferBatchInner::new(epoch, spill_offset, sorted_items, None, size, None);
use crate::hummock::event_handler::TEST_LOCAL_INSTANCE_ID;
SharedBufferBatch {
inner: Arc::new(inner),
table_id,
instance_id: TEST_LOCAL_INSTANCE_ID,
}
}
}
@@ -1532,7 +1525,7 @@ mod tests {
];
// newer data comes first
let imms = vec![imm3, imm2, imm1];
let merged_imm = merge_imms_in_memory(table_id, 0, imms.clone(), None).await;
let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;

// Point lookup
for (i, items) in batch_items.iter().enumerate() {
@@ -1717,7 +1710,7 @@ mod tests {
];
// newer data comes first
let imms = vec![imm3, imm2, imm1];
let merged_imm = merge_imms_in_memory(table_id, 0, imms.clone(), None).await;
let merged_imm = merge_imms_in_memory(table_id, imms.clone(), None).await;

// Point lookup
for (i, items) in batch_items.iter().enumerate() {
7 changes: 4 additions & 3 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ use std::sync::Arc;

use arc_swap::ArcSwap;
use bytes::Bytes;
use futures::FutureExt;
use itertools::Itertools;
use more_asserts::assert_gt;
use risingwave_common::catalog::TableId;
@@ -521,15 +522,15 @@ impl StateStore for HummockStorage {
wait_for_epoch(&self.version_update_notifier_tx, wait_epoch).await
}

async fn sync(&self, epoch: u64) -> StorageResult<SyncResult> {
fn sync(&self, epoch: u64) -> impl SyncFuture {
let (tx, rx) = oneshot::channel();
self.hummock_event_sender
.send(HummockEvent::AwaitSyncEpoch {
.send(HummockEvent::SyncEpoch {
new_sync_epoch: epoch,
sync_result_sender: tx,
})
.expect("should send success");
Ok(rx.await.expect("should wait success")?)
rx.map(|recv_result| Ok(recv_result.expect("should wait success")?))
}

fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) {
3 changes: 1 addition & 2 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
@@ -541,7 +541,6 @@ impl LocalHummockStorage {
old_values,
size,
table_id,
instance_id,
Some(tracker),
);
self.spill_offset += 1;
@@ -551,7 +550,7 @@ impl LocalHummockStorage {
// insert imm to uploader
if !self.is_replicated {
self.event_sender
.send(HummockEvent::ImmToUploader(imm))
.send(HummockEvent::ImmToUploader { instance_id, imm })
.unwrap();
}
imm_size
9 changes: 6 additions & 3 deletions src/storage/src/memory.rs
Original file line number Diff line number Diff line change
@@ -654,10 +654,13 @@ impl<R: RangeKv> StateStore for RangeKvStateStore<R> {
}

#[allow(clippy::unused_async)]
async fn sync(&self, _epoch: u64) -> StorageResult<SyncResult> {
self.inner.flush()?;
fn sync(&self, _epoch: u64) -> impl SyncFuture {
let result = self.inner.flush();
// memory backend doesn't need to push to S3, so this is a no-op
Ok(SyncResult::default())
async move {
result?;
Ok(SyncResult::default())
}
}

fn seal_epoch(&self, _epoch: u64, _is_checkpoint: bool) {}
11 changes: 11 additions & 0 deletions src/storage/src/monitor/hummock_state_store_metrics.rs
Original file line number Diff line number Diff line change
@@ -75,6 +75,7 @@ pub struct HummockStateStoreMetrics {
// uploading task
pub uploader_uploading_task_size: GenericGauge<AtomicU64>,
pub uploader_uploading_task_count: IntGauge,
pub uploader_imm_size: GenericGauge<AtomicU64>,
pub uploader_upload_task_latency: Histogram,
pub uploader_syncing_epoch_count: IntGauge,
pub uploader_wait_poll_latency: Histogram,
@@ -324,6 +325,15 @@ impl HummockStateStoreMetrics {
)
.unwrap();

let uploader_imm_size = GenericGauge::new(
"state_store_uploader_imm_size",
"Total size of imms tracked by uploader",
)
.unwrap();
registry
.register(Box::new(uploader_imm_size.clone()))
.unwrap();

let opts = histogram_opts!(
"state_store_uploader_upload_task_latency",
"Latency of uploader uploading tasks",
@@ -466,6 +476,7 @@ impl HummockStateStoreMetrics {
spill_task_size_from_unsealed: spill_task_size.with_label_values(&["unsealed"]),
uploader_uploading_task_size,
uploader_uploading_task_count,
uploader_imm_size,
uploader_upload_task_latency,
uploader_syncing_epoch_count,
uploader_wait_poll_latency,
29 changes: 13 additions & 16 deletions src/storage/src/monitor/monitored_store.rs
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ use futures::{Future, TryFutureExt};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult};
use risingwave_hummock_sdk::HummockReadEpoch;
use thiserror_ext::AsReport;
use tokio::time::Instant;
use tracing::{error, Instrument};
@@ -286,23 +286,20 @@ impl<S: StateStore> StateStore for MonitoredStateStore<S> {
.inspect_err(|e| error!(error = %e.as_report(), "Failed in wait_epoch"))
}

async fn sync(&self, epoch: u64) -> StorageResult<SyncResult> {
// TODO: this metrics may not be accurate if we start syncing after `seal_epoch`. We may
// move this metrics to inside uploader
fn sync(&self, epoch: u64) -> impl SyncFuture {
let future = self.inner.sync(epoch).instrument_await("store_sync");
let timer = self.storage_metrics.sync_duration.start_timer();
let sync_result = self
.inner
.sync(epoch)
.instrument_await("store_sync")
.await
.inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?;
timer.observe_duration();
if sync_result.sync_size != 0 {
self.storage_metrics
.sync_size
.observe(sync_result.sync_size as _);
let sync_size = self.storage_metrics.sync_size.clone();
async move {
let sync_result = future
.await
.inspect_err(|e| error!(error = %e.as_report(), "Failed in sync"))?;
timer.observe_duration();
if sync_result.sync_size != 0 {
sync_size.observe(sync_result.sync_size as _);
}
Ok(sync_result)
}
Ok(sync_result)
}

fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) {
19 changes: 11 additions & 8 deletions src/storage/src/monitor/traced_store.rs
Original file line number Diff line number Diff line change
@@ -11,13 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use bytes::Bytes;
use futures::{Future, TryFutureExt};
use futures::{Future, FutureExt, TryFutureExt};
use risingwave_common::buffer::Bitmap;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult};
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_hummock_trace::{
init_collector, should_use_trace, ConcurrentId, MayTraceSpan, OperationResult, StorageType,
TraceResult, TraceSpan, TracedBytes, TracedSealCurrentEpochOptions, LOCAL_ID,
@@ -238,15 +239,17 @@ impl<S: StateStore> StateStore for TracedStateStore<S> {
res
}

async fn sync(&self, epoch: u64) -> StorageResult<SyncResult> {
fn sync(&self, epoch: u64) -> impl SyncFuture {
let span: MayTraceSpan = TraceSpan::new_sync_span(epoch, self.storage_type);

let sync_result = self.inner.sync(epoch).await;
let future = self.inner.sync(epoch);

span.may_send_result(OperationResult::Sync(
sync_result.as_ref().map(|res| res.sync_size).into(),
));
sync_result
future.map(move |sync_result| {
span.may_send_result(OperationResult::Sync(
sync_result.as_ref().map(|res| res.sync_size).into(),
));
sync_result
})
}

fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) {
4 changes: 4 additions & 0 deletions src/storage/src/opts.rs
Original file line number Diff line number Diff line change
@@ -39,6 +39,9 @@ pub struct StorageOpts {
/// The shared buffer will start flushing data to object when the ratio of memory usage to the
/// shared buffer capacity exceed such ratio.
pub shared_buffer_flush_ratio: f32,
/// The minimum total flush size of shared buffer spill. When a shared buffer spill is trigger,
/// the total flush size across multiple epochs should be at least higher than this size.
pub shared_buffer_min_batch_flush_size_mb: usize,
/// Remote directory for storing data and metadata objects.
pub data_directory: String,
/// Whether to enable write conflict detection
@@ -158,6 +161,7 @@ impl From<(&RwConfig, &SystemParamsReader, &StorageMemoryConfig)> for StorageOpt
.share_buffer_compaction_worker_threads_number,
shared_buffer_capacity_mb: s.shared_buffer_capacity_mb,
shared_buffer_flush_ratio: c.storage.shared_buffer_flush_ratio,
shared_buffer_min_batch_flush_size_mb: c.storage.shared_buffer_min_batch_flush_size_mb,
data_directory: p.data_directory().to_string(),
write_conflict_detection_enabled: c.storage.write_conflict_detection_enabled,
block_cache_capacity_mb: s.block_cache_capacity_mb,
6 changes: 3 additions & 3 deletions src/storage/src/panic_store.rs
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ use std::sync::Arc;
use bytes::Bytes;
use risingwave_common::buffer::Bitmap;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult};
use risingwave_hummock_sdk::HummockReadEpoch;

use crate::error::StorageResult;
use crate::storage_value::StorageValue;
@@ -159,8 +159,8 @@ impl StateStore for PanicStateStore {
}

#[allow(clippy::unused_async)]
async fn sync(&self, _epoch: u64) -> StorageResult<SyncResult> {
panic!("should not await sync epoch from the panic state store!");
fn sync(&self, _epoch: u64) -> impl SyncFuture {
async { panic!("should not await sync epoch from the panic state store!") }
}

fn seal_epoch(&self, _epoch: u64, _is_checkpoint: bool) {
4 changes: 3 additions & 1 deletion src/storage/src/store.rs
Original file line number Diff line number Diff line change
@@ -308,6 +308,8 @@ pub trait StateStoreWrite: StaticSendSync {
) -> StorageResult<usize>;
}

pub trait SyncFuture = Future<Output = StorageResult<SyncResult>> + Send + 'static;

pub trait StateStore: StateStoreRead + StaticSendSync + Clone {
type Local: LocalStateStore;

@@ -318,7 +320,7 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone {
epoch: HummockReadEpoch,
) -> impl Future<Output = StorageResult<()>> + Send + '_;

fn sync(&self, epoch: u64) -> impl Future<Output = StorageResult<SyncResult>> + Send + '_;
fn sync(&self, epoch: u64) -> impl SyncFuture;

/// update max current epoch in storage.
fn seal_epoch(&self, epoch: u64, is_checkpoint: bool);
27 changes: 18 additions & 9 deletions src/storage/src/store_impl.rs
Original file line number Diff line number Diff line change
@@ -212,7 +212,7 @@ pub mod verify {
use bytes::Bytes;
use risingwave_common::buffer::Bitmap;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult};
use risingwave_hummock_sdk::HummockReadEpoch;
use tracing::log::warn;

use crate::error::StorageResult;
@@ -511,11 +511,15 @@ pub mod verify {
self.actual.try_wait_epoch(epoch)
}

async fn sync(&self, epoch: u64) -> StorageResult<SyncResult> {
if let Some(expected) = &self.expected {
let _ = expected.sync(epoch).await;
fn sync(&self, epoch: u64) -> impl SyncFuture {
let expected_future = self.expected.as_ref().map(|expected| expected.sync(epoch));
let actual_future = self.actual.sync(epoch);
async move {
if let Some(expected_future) = expected_future {
expected_future.await?;
}
actual_future.await
}
self.actual.sync(epoch).await
}

fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) {
@@ -731,6 +735,8 @@ pub mod boxed_state_store {

use bytes::Bytes;
use dyn_clone::{clone_trait_object, DynClone};
use futures::future::BoxFuture;
use futures::FutureExt;
use risingwave_common::buffer::Bitmap;
use risingwave_hummock_sdk::key::{TableKey, TableKeyRange};
use risingwave_hummock_sdk::{HummockReadEpoch, SyncResult};
@@ -1017,7 +1023,7 @@ pub mod boxed_state_store {
pub trait DynamicDispatchedStateStoreExt: StaticSendSync {
async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()>;

async fn sync(&self, epoch: u64) -> StorageResult<SyncResult>;
fn sync(&self, epoch: u64) -> BoxFuture<'static, StorageResult<SyncResult>>;

fn seal_epoch(&self, epoch: u64, is_checkpoint: bool);

@@ -1034,8 +1040,8 @@ pub mod boxed_state_store {
self.try_wait_epoch(epoch).await
}

async fn sync(&self, epoch: u64) -> StorageResult<SyncResult> {
self.sync(epoch).await
fn sync(&self, epoch: u64) -> BoxFuture<'static, StorageResult<SyncResult>> {
self.sync(epoch).boxed()
}

fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) {
@@ -1118,7 +1124,10 @@ pub mod boxed_state_store {
self.deref().try_wait_epoch(epoch)
}

fn sync(&self, epoch: u64) -> impl Future<Output = StorageResult<SyncResult>> + Send + '_ {
fn sync(
&self,
epoch: u64,
) -> impl Future<Output = StorageResult<SyncResult>> + Send + 'static {
self.deref().sync(epoch)
}

77 changes: 41 additions & 36 deletions src/stream/src/task/barrier_manager/managed_state.rs
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ use std::sync::Arc;
use anyhow::anyhow;
use await_tree::InstrumentAwait;
use futures::stream::FuturesOrdered;
use futures::{FutureExt, StreamExt};
use futures::{FutureExt, StreamExt, TryFutureExt};
use prometheus::HistogramTimer;
use risingwave_common::must_match;
use risingwave_hummock_sdk::SyncResult;
@@ -96,43 +96,48 @@ fn sync_epoch(
kind: BarrierKind,
) -> impl Future<Output = StreamResult<Option<SyncResult>>> + 'static {
let barrier_sync_latency = streaming_metrics.barrier_sync_latency.clone();
let state_store = state_store.clone();

async move {
let sync_result = match kind {
BarrierKind::Unspecified => unreachable!(),
BarrierKind::Initial => {
if let Some(hummock) = state_store.as_hummock() {
let mce = hummock.get_pinned_version().max_committed_epoch();
assert_eq!(
mce, prev_epoch,
"first epoch should match with the current version",
);
}
tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
None
}
BarrierKind::Barrier => None,
BarrierKind::Checkpoint => {
let timer = barrier_sync_latency.start_timer();
let sync_result = dispatch_state_store!(state_store, store, {
store
.sync(prev_epoch)
.instrument_await(format!("sync_epoch (epoch {})", prev_epoch))
.await
.inspect_err(|e| {
tracing::error!(
prev_epoch,
error = %e.as_report(),
"Failed to sync state store",
);
})
})?;
timer.observe_duration();
Some(sync_result)
let sync_result_future = match kind {
BarrierKind::Unspecified => unreachable!(),
BarrierKind::Initial => {
if let Some(hummock) = state_store.as_hummock() {
let mce = hummock.get_pinned_version().max_committed_epoch();
assert_eq!(
mce, prev_epoch,
"first epoch should match with the current version",
);
}
};
Ok(sync_result)
tracing::info!(?prev_epoch, "ignored syncing data for the first barrier");
None
}
BarrierKind::Barrier => None,
BarrierKind::Checkpoint => {
let timer = barrier_sync_latency.start_timer();
let sync_result_future = dispatch_state_store!(state_store, store, {
store
.sync(prev_epoch)
.instrument_await(format!("sync_epoch (epoch {})", prev_epoch))
.inspect_ok(move |_| {
timer.observe_duration();
})
.inspect_err(move |e| {
tracing::error!(
prev_epoch,
error = %e.as_report(),
"Failed to sync state store",
);
})
.boxed()
});
Some(sync_result_future)
}
};
async move {
if let Some(sync_result_future) = sync_result_future {
Ok(Some(sync_result_future.await?))
} else {
Ok(None)
}
}
}

0 comments on commit 869db4b

Please sign in to comment.