Skip to content

Commit

Permalink
feat(meta): support database checkpoint isolation (#19173)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Nov 8, 2024
1 parent c658340 commit 3f0179c
Show file tree
Hide file tree
Showing 30 changed files with 957 additions and 596 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,9 +817,9 @@ def section_streaming(outer_panels):
"The number of barriers that have been ingested but not completely processed. This metric reflects the "
"current level of congestion within the system.",
[
panels.target(f"{metric('all_barrier_nums')}", "all_barrier"),
panels.target(f"{metric('all_barrier_nums')}", "all_barrier {{database_id}}"),
panels.target(
f"{metric('in_flight_barrier_nums')}", "in_flight_barrier"
f"{metric('in_flight_barrier_nums')}", "in_flight_barrier {{database_id}}"
),
panels.target(
f"{metric('meta_snapshot_backfill_inflight_barrier_num')}",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def section_overview(panels):
[
panels.target(
f"{metric('all_barrier_nums')} >= bool 200",
"Too Many Barriers",
"Too Many Barriers {{database_id}}",
),
panels.target(
f"sum(rate({metric('recovery_latency_count')}[$__rate_interval])) > bool 0 + sum({metric('recovery_failure_cnt')}) > bool 0",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ message InjectBarrierRequest {
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;
uint64 partial_graph_id = 6;

repeated common.ActorInfo broadcast_info = 8;
repeated stream_plan.StreamActor actors_to_build = 9;
Expand Down Expand Up @@ -48,7 +48,7 @@ message BarrierCompleteResponse {
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
uint32 partial_graph_id = 8;
uint64 partial_graph_id = 8;
// prev_epoch of barrier
uint64 epoch = 9;
}
Expand All @@ -69,7 +69,7 @@ message StreamingControlStreamRequest {
}

message RemovePartialGraphRequest {
repeated uint32 partial_graph_ids = 1;
repeated uint64 partial_graph_ids = 1;
}

oneof request {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/model/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::SubscriptionId;
use crate::{ObjectId, SubscriptionId};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "subscription")]
Expand All @@ -28,7 +28,7 @@ pub struct Model {
pub retention_seconds: i64,
pub definition: String,
pub subscription_state: i32,
pub dependent_table_id: i32,
pub dependent_table_id: ObjectId,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
3 changes: 1 addition & 2 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::catalog::TableId;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::model::TableParallelism;
use risingwave_meta::stream::{RescheduleOptions, ScaleControllerRef, WorkerReschedule};
Expand Down Expand Up @@ -123,7 +123,6 @@ impl ScaleService for ScaleServiceImpl {
.split_fragment_map_by_database(worker_reschedules)
.await?
{
let database_id = DatabaseId::new(database_id as _);
let streaming_job_ids = self
.metadata_manager
.catalog_controller
Expand Down
10 changes: 2 additions & 8 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,7 @@ impl StreamManagerService for StreamServiceImpl {
async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
for database_id in self.metadata_manager.list_active_database_ids().await? {
self.barrier_scheduler
.run_command(
DatabaseId::new(database_id as _),
Command::pause(PausedReason::Manual),
)
.run_command(database_id, Command::pause(PausedReason::Manual))
.await?;
}
Ok(Response::new(PauseResponse {}))
Expand All @@ -96,10 +93,7 @@ impl StreamManagerService for StreamServiceImpl {
async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
for database_id in self.metadata_manager.list_active_database_ids().await? {
self.barrier_scheduler
.run_command(
DatabaseId::new(database_id as _),
Command::resume(PausedReason::Manual),
)
.run_command(database_id, Command::resume(PausedReason::Manual))
.await?;
}
Ok(Response::new(ResumeResponse {}))
Expand Down
18 changes: 8 additions & 10 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::hash::ActorMapping;
use risingwave_common::types::Timestamptz;
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model::{ObjectId, WorkerId};
use risingwave_meta_model::WorkerId;
use risingwave_pb::catalog::{CreateType, Table};
use risingwave_pb::common::PbWorkerNode;
use risingwave_pb::meta::table_fragments::PbActorStatus;
Expand Down Expand Up @@ -106,27 +106,26 @@ impl ReplaceTablePlan {
let mut fragment_changes = HashMap::new();
for fragment in self.new_table_fragments.fragments.values() {
let fragment_change = CommandFragmentChanges::NewFragment(
self.streaming_job.database_id().into(),
self.streaming_job.id().into(),
InflightFragmentInfo {
actors: fragment
.actors
.iter()
.map(|actor| {
(
actor.actor_id as i32,
actor.actor_id,
self.new_table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.worker_id(),
.worker_id() as WorkerId,
)
})
.collect(),
state_table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| *table_id as ObjectId)
.map(|table_id| TableId::new(*table_id))
.collect(),
},
);
Expand Down Expand Up @@ -172,19 +171,19 @@ impl CreateStreamingJobCommandInfo {
.iter()
.map(|actor| {
(
actor.actor_id as i32,
actor.actor_id,
self.table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.worker_id(),
.worker_id() as WorkerId,
)
})
.collect(),
state_table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| *table_id as ObjectId)
.map(|table_id| TableId::new(*table_id))
.collect(),
},
)
Expand All @@ -207,7 +206,7 @@ pub enum CreateStreamingJobType {
/// [`Command`] is the input of [`crate::barrier::GlobalBarrierWorker`]. For different commands,
/// it will build different barriers to send, and may do different stuffs after the barrier is
/// collected.
#[derive(Debug, Clone, strum::Display)]
#[derive(Debug, strum::Display)]
pub enum Command {
/// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
/// all messages before the checkpoint barrier should have been committed.
Expand Down Expand Up @@ -336,7 +335,6 @@ impl Command {
(
fragment_id,
CommandFragmentChanges::NewFragment(
info.streaming_job.database_id().into(),
info.streaming_job.id().into(),
fragment_info,
),
Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::cmp::max;
use std::collections::HashMap;
use std::ops::Bound::{Excluded, Unbounded};

use risingwave_common::catalog::TableId;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_meta_model::WorkerId;
use risingwave_pb::ddl_service::DdlProgress;
Expand Down Expand Up @@ -78,7 +78,6 @@ impl CreatingStreamingJobControl {
let actors_to_create = info.table_fragments.actors_to_create();
let graph_info = InflightStreamingJobInfo {
job_id: table_id,
database_id: info.streaming_job.database_id().into(),
fragment_infos,
};

Expand Down Expand Up @@ -165,6 +164,7 @@ impl CreatingStreamingJobControl {
}

fn inject_barrier(
database_id: DatabaseId,
table_id: TableId,
control_stream_manager: &mut ControlStreamManager,
barrier_control: &mut CreatingStreamingJobBarrierControl,
Expand All @@ -177,6 +177,7 @@ impl CreatingStreamingJobControl {
}: CreatingJobInjectBarrierInfo,
) -> MetaResult<()> {
let node_to_collect = control_stream_manager.inject_barrier(
database_id,
Some(table_id),
mutation,
&barrier_info,
Expand Down Expand Up @@ -235,6 +236,7 @@ impl CreatingStreamingJobControl {
.on_new_upstream_epoch(barrier_info, start_consume_upstream)
{
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
self.info.table_fragments.table_id(),
control_stream_manager,
&mut self.barrier_control,
Expand Down Expand Up @@ -263,6 +265,7 @@ impl CreatingStreamingJobControl {
let table_id = self.info.table_fragments.table_id();
for info in prev_barriers_to_inject {
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
table_id,
control_stream_manager,
&mut self.barrier_control,
Expand Down
Loading

0 comments on commit 3f0179c

Please sign in to comment.