Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_support_merge_cg_part
  • Loading branch information
Li0k committed Sep 9, 2024
2 parents c7c6fa4 + 29d2e1e commit 7be2d3d
Show file tree
Hide file tree
Showing 27 changed files with 77 additions and 64 deletions.
2 changes: 1 addition & 1 deletion docker/docker-compose-distributed-etcd.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.10.0}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.0.0-rc.1}
services:
compactor-0:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-distributed.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.10.0}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.0.0-rc.1}
services:
compactor-0:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-etcd.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.10.0}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.0.0-rc.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-azblob.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.10.0}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.0.0-rc.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-gcs.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.10.0}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.0.0-rc.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-local-fs.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.10.0}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.0.0-rc.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-obs.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.10.0}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.0.0-rc.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-oss.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.10.0}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.0.0-rc.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-s3.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.10.0}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.0.0-rc.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-sqlite.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.10.0}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.0.0-rc.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.10.0}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v2.0.0-rc.1}
services:
risingwave-standalone:
<<: *image
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ struct RwDdlProgress {

#[system_catalog(table, "rw_catalog.rw_ddl_progress")]
async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwDdlProgress>> {
let ddl_progresses = reader.meta_client.list_ddl_progress().await?;
let ddl_progresses = reader.meta_client.get_ddl_progress().await?;

let table_ids = ddl_progresses
.iter()
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ pub async fn handle_show_object(
.into());
}
ShowObject::Jobs => {
let resp = session.env().meta_client().list_ddl_progress().await?;
let resp = session.env().meta_client().get_ddl_progress().await?;
let rows = resp.into_iter().map(|job| ShowJobRow {
id: job.id as i64,
statement: job.statement,
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub trait FrontendMetaClient: Send + Sync {

async fn set_session_param(&self, param: String, value: Option<String>) -> Result<String>;

async fn list_ddl_progress(&self) -> Result<Vec<DdlProgress>>;
async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>>;

async fn get_tables(&self, table_ids: &[u32]) -> Result<HashMap<u32, Table>>;

Expand Down Expand Up @@ -232,7 +232,7 @@ impl FrontendMetaClient for FrontendMetaClientImpl {
self.0.set_session_param(param, value).await
}

async fn list_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
async fn get_ddl_progress(&self) -> Result<Vec<DdlProgress>> {
let ddl_progress = self.0.get_ddl_progress().await?;
Ok(ddl_progress)
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ impl FrontendMetaClient for MockFrontendMetaClient {
Ok("".to_string())
}

async fn list_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
async fn get_ddl_progress(&self) -> RpcResult<Vec<DdlProgress>> {
Ok(vec![])
}

Expand Down
3 changes: 3 additions & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,9 @@ impl GlobalBarrierManager {
if let Some(request) = request {
match request {
BarrierManagerRequest::GetDdlProgress(result_tx) => {
// Progress of normal backfill
let mut progress = self.checkpoint_control.create_mview_tracker.gen_ddl_progress();
// Progress of snapshot backfill
for creating_job in self.checkpoint_control.creating_streaming_job_controls.values() {
progress.extend([(creating_job.info.table_fragments.table_id().table_id, creating_job.gen_ddl_progress())]);
}
Expand Down Expand Up @@ -1634,6 +1636,7 @@ impl GlobalBarrierManagerContext {
Ok(info)
}

/// Serving `SHOW JOBS / SELECT * FROM rw_ddl_progress`
pub async fn get_ddl_progress(&self) -> MetaResult<Vec<DdlProgress>> {
let mut ddl_progress = {
let (tx, rx) = oneshot::channel();
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::executor::backfill::utils::{
update_pos_by_vnode, BackfillProgressPerVnode, BackfillRateLimiter, BackfillState,
};
use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

type Builders = HashMap<VirtualNode, DataChunkBuilder>;

Expand All @@ -56,7 +56,7 @@ pub struct ArrangementBackfillExecutor<S: StateStore, SD: ValueRowSerde> {
/// The column indices need to be forwarded to the downstream from the upstream and table scan.
output_indices: Vec<usize>,

progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,

actor_id: ActorId,

Expand All @@ -79,7 +79,7 @@ where
upstream: Executor,
state_table: StateTable<S>,
output_indices: Vec<usize>,
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
rate_limit: Option<usize>,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::executor::backfill::CdcScanOptions;
use crate::executor::monitor::CdcBackfillMetrics;
use crate::executor::prelude::*;
use crate::executor::UpdateMutation;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

/// `split_id`, `is_finished`, `row_count`, `cdc_offset` all occupy 1 column each.
const METADATA_STATE_LEN: usize = 4;
Expand All @@ -68,7 +68,7 @@ pub struct CdcBackfillExecutor<S: StateStore> {

// TODO: introduce a CdcBackfillProgress to report finish to Meta
// This object is just a stub right now
progress: Option<CreateMviewProgress>,
progress: Option<CreateMviewProgressReporter>,

metrics: CdcBackfillMetrics,

Expand All @@ -86,7 +86,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
upstream: Executor,
output_indices: Vec<usize>,
output_columns: Vec<ColumnDesc>,
progress: Option<CreateMviewProgress>,
progress: Option<CreateMviewProgressReporter>,
metrics: Arc<StreamingMetrics>,
state_table: StateTable<S>,
rate_limit_rps: Option<u32>,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/backfill/no_shuffle_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::executor::backfill::utils::{
METADATA_STATE_LEN,
};
use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
/// We can decode that into `BackfillState` on recovery.
Expand Down Expand Up @@ -76,7 +76,7 @@ pub struct BackfillExecutor<S: StateStore> {
output_indices: Vec<usize>,

/// PTAL at the docstring for `CreateMviewProgress` to understand how we compute it.
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,

actor_id: ActorId,

Expand All @@ -100,7 +100,7 @@ where
upstream: Executor,
state_table: Option<StateTable<S>>,
output_indices: Vec<usize>,
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
metrics: Arc<StreamingMetrics>,
chunk_size: usize,
rate_limit: Option<usize>,
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/backfill/snapshot_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::executor::{
DispatcherBarrier, DispatcherMessage, Execute, Executor, Message, Mutation,
StreamExecutorError, StreamExecutorResult,
};
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

pub struct SnapshotBackfillExecutor<S: StateStore> {
/// Upstream table
Expand All @@ -55,7 +55,7 @@ pub struct SnapshotBackfillExecutor<S: StateStore> {
/// The column indices need to be forwarded to the downstream from the upstream and table scan.
output_indices: Vec<usize>,

progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,

chunk_size: usize,
rate_limit: Option<usize>,
Expand All @@ -73,7 +73,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
upstream: Executor,
output_indices: Vec<usize>,
actor_ctx: ActorContextRef,
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
chunk_size: usize,
rate_limit: Option<usize>,
barrier_rx: UnboundedReceiver<Barrier>,
Expand Down Expand Up @@ -617,7 +617,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
rate_limit: Option<usize>,
barrier_rx: &'a mut UnboundedReceiver<Barrier>,
output_indices: &'a [usize],
mut progress: CreateMviewProgress,
mut progress: CreateMviewProgressReporter,
first_recv_barrier: Barrier,
) {
let mut barrier_epoch = first_recv_barrier.epoch;
Expand Down
10 changes: 5 additions & 5 deletions src/stream/src/executor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

/// [`ChainExecutor`] is an executor that enables synchronization between the existing stream and
/// newly appended executors. Currently, [`ChainExecutor`] is mainly used to implement MV on MV
Expand All @@ -24,7 +24,7 @@ pub struct ChainExecutor {

upstream: Executor,

progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,

actor_id: ActorId,

Expand All @@ -36,7 +36,7 @@ impl ChainExecutor {
pub fn new(
snapshot: Executor,
upstream: Executor,
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
upstream_only: bool,
) -> Self {
Self {
Expand Down Expand Up @@ -115,12 +115,12 @@ mod test {
use super::ChainExecutor;
use crate::executor::test_utils::MockSource;
use crate::executor::{AddMutation, Barrier, Execute, Message, Mutation, PkIndices};
use crate::task::{CreateMviewProgress, LocalBarrierManager};
use crate::task::{CreateMviewProgressReporter, LocalBarrierManager};

#[tokio::test]
async fn test_basic() {
let barrier_manager = LocalBarrierManager::for_test();
let progress = CreateMviewProgress::for_test(barrier_manager);
let progress = CreateMviewProgressReporter::for_test(barrier_manager);
let actor_id = progress.actor_id();

let schema = Schema::new(vec![Field::unnamed(DataType::Int64)]);
Expand Down
10 changes: 7 additions & 3 deletions src/stream/src/executor/rearranged_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use futures::stream;
use futures::stream::select_with_strategy;

use crate::executor::prelude::*;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

/// `ChainExecutor` is an executor that enables synchronization between the existing stream and
/// newly appended executors. Currently, `ChainExecutor` is mainly used to implement MV on MV
Expand All @@ -31,7 +31,7 @@ pub struct RearrangedChainExecutor {

upstream: Executor,

progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,

actor_id: ActorId,
}
Expand Down Expand Up @@ -74,7 +74,11 @@ impl RearrangedMessage {
}

impl RearrangedChainExecutor {
pub fn new(snapshot: Executor, upstream: Executor, progress: CreateMviewProgress) -> Self {
pub fn new(
snapshot: Executor,
upstream: Executor,
progress: CreateMviewProgressReporter,
) -> Self {
Self {
snapshot,
upstream,
Expand Down
6 changes: 3 additions & 3 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use crate::common::rate_limit::limited_chunk_size;
use crate::executor::prelude::*;
use crate::executor::source::source_executor::WAIT_BARRIER_MULTIPLE_TIMES;
use crate::executor::UpdateMutation;
use crate::task::CreateMviewProgress;
use crate::task::CreateMviewProgressReporter;

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
pub enum BackfillState {
Expand Down Expand Up @@ -92,7 +92,7 @@ pub struct SourceBackfillExecutorInner<S: StateStore> {
/// Rate limit in rows/s.
rate_limit_rps: Option<u32>,

progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
}

/// Local variables used in the backfill stage.
Expand Down Expand Up @@ -244,7 +244,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
system_params: SystemParamsReaderRef,
backfill_state_store: BackfillStateTableHandler<S>,
rate_limit_rps: Option<u32>,
progress: CreateMviewProgress,
progress: CreateMviewProgressReporter,
) -> Self {
let source_split_change_count = metrics
.source_split_change_count
Expand Down
Loading

0 comments on commit 7be2d3d

Please sign in to comment.