Skip to content

Commit

Permalink
feat(meta): support database checkpoint isolation
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Nov 4, 2024
1 parent e7e4a2c commit 07a21a7
Show file tree
Hide file tree
Showing 26 changed files with 941 additions and 585 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,9 +817,9 @@ def section_streaming(outer_panels):
"The number of barriers that have been ingested but not completely processed. This metric reflects the "
"current level of congestion within the system.",
[
panels.target(f"{metric('all_barrier_nums')}", "all_barrier"),
panels.target(f"{metric('all_barrier_nums')}", "all_barrier {{database_id}}"),
panels.target(
f"{metric('in_flight_barrier_nums')}", "in_flight_barrier"
f"{metric('in_flight_barrier_nums')}", "in_flight_barrier {{database_id}}"
),
panels.target(
f"{metric('meta_snapshot_backfill_inflight_barrier_num')}",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def section_overview(panels):
[
panels.target(
f"{metric('all_barrier_nums')} >= bool 200",
"Too Many Barriers",
"Too Many Barriers {{database_id}}",
),
panels.target(
f"sum(rate({metric('recovery_latency_count')}[$__rate_interval])) > bool 0 + sum({metric('recovery_failure_cnt')}) > bool 0",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

21 changes: 18 additions & 3 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,27 @@ import "stream_plan.proto";
option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message PartialGraphId {
message Database {
uint32 database_id = 1;
}
message CreatingJob {
uint32 database_id = 1;
uint32 job_id = 2;
}

oneof graph_id {
Database database = 1;
CreatingJob creating_job = 2;
}
}

message InjectBarrierRequest {
string request_id = 1;
stream_plan.Barrier barrier = 2;
repeated uint32 actor_ids_to_collect = 4;
repeated uint32 table_ids_to_sync = 5;
uint32 partial_graph_id = 6;
PartialGraphId partial_graph_id = 6;

Check failure on line 32 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "6" with name "partial_graph_id" on message "InjectBarrierRequest" changed type from "uint32" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

repeated common.ActorInfo broadcast_info = 8;
repeated stream_plan.StreamActor actors_to_build = 9;
Expand Down Expand Up @@ -48,7 +63,7 @@ message BarrierCompleteResponse {
uint32 worker_id = 5;
map<uint32, hummock.TableWatermarks> table_watermarks = 6;
repeated hummock.SstableInfo old_value_sstables = 7;
uint32 partial_graph_id = 8;
PartialGraphId partial_graph_id = 8;

Check failure on line 66 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "8" with name "partial_graph_id" on message "BarrierCompleteResponse" changed type from "uint32" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
// prev_epoch of barrier
uint64 epoch = 9;
}
Expand All @@ -69,7 +84,7 @@ message StreamingControlStreamRequest {
}

message RemovePartialGraphRequest {
repeated uint32 partial_graph_ids = 1;
repeated PartialGraphId partial_graph_ids = 1;

Check failure on line 87 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "1" with name "partial_graph_ids" on message "RemovePartialGraphRequest" changed type from "uint32" to "message". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.
}

oneof request {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/model/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::SubscriptionId;
use crate::{ObjectId, SubscriptionId};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "subscription")]
Expand All @@ -28,7 +28,7 @@ pub struct Model {
pub retention_seconds: i64,
pub definition: String,
pub subscription_state: i32,
pub dependent_table_id: i32,
pub dependent_table_id: ObjectId,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down
18 changes: 8 additions & 10 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::hash::ActorMapping;
use risingwave_common::types::Timestamptz;
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model::{ObjectId, WorkerId};
use risingwave_meta_model::WorkerId;
use risingwave_pb::catalog::{CreateType, Table};
use risingwave_pb::common::PbWorkerNode;
use risingwave_pb::meta::table_fragments::PbActorStatus;
Expand Down Expand Up @@ -106,27 +106,26 @@ impl ReplaceTablePlan {
let mut fragment_changes = HashMap::new();
for fragment in self.new_table_fragments.fragments.values() {
let fragment_change = CommandFragmentChanges::NewFragment(
self.streaming_job.database_id().into(),
self.streaming_job.id().into(),
InflightFragmentInfo {
actors: fragment
.actors
.iter()
.map(|actor| {
(
actor.actor_id as i32,
actor.actor_id,
self.new_table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.worker_id(),
.worker_id() as WorkerId,
)
})
.collect(),
state_table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| *table_id as ObjectId)
.map(|table_id| TableId::new(*table_id))
.collect(),
},
);
Expand Down Expand Up @@ -172,19 +171,19 @@ impl CreateStreamingJobCommandInfo {
.iter()
.map(|actor| {
(
actor.actor_id as i32,
actor.actor_id,
self.table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.worker_id(),
.worker_id() as WorkerId,
)
})
.collect(),
state_table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| *table_id as ObjectId)
.map(|table_id| TableId::new(*table_id))
.collect(),
},
)
Expand All @@ -207,7 +206,7 @@ pub enum CreateStreamingJobType {
/// [`Command`] is the input of [`crate::barrier::GlobalBarrierWorker`]. For different commands,
/// it will build different barriers to send, and may do different stuffs after the barrier is
/// collected.
#[derive(Debug, Clone, strum::Display)]
#[derive(Debug, strum::Display)]
pub enum Command {
/// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed
/// all messages before the checkpoint barrier should have been committed.
Expand Down Expand Up @@ -336,7 +335,6 @@ impl Command {
(
fragment_id,
CommandFragmentChanges::NewFragment(
info.streaming_job.database_id().into(),
info.streaming_job.id().into(),
fragment_info,
),
Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::cmp::max;
use std::collections::HashMap;
use std::ops::Bound::{Excluded, Unbounded};

use risingwave_common::catalog::TableId;
use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_common::metrics::LabelGuardedIntGauge;
use risingwave_meta_model::WorkerId;
use risingwave_pb::ddl_service::DdlProgress;
Expand Down Expand Up @@ -78,7 +78,6 @@ impl CreatingStreamingJobControl {
let actors_to_create = info.table_fragments.actors_to_create();
let graph_info = InflightStreamingJobInfo {
job_id: table_id,
database_id: info.streaming_job.database_id().into(),
fragment_infos,
};

Expand Down Expand Up @@ -165,6 +164,7 @@ impl CreatingStreamingJobControl {
}

fn inject_barrier(
database_id: DatabaseId,
table_id: TableId,
control_stream_manager: &mut ControlStreamManager,
barrier_control: &mut CreatingStreamingJobBarrierControl,
Expand All @@ -177,6 +177,7 @@ impl CreatingStreamingJobControl {
}: CreatingJobInjectBarrierInfo,
) -> MetaResult<()> {
let node_to_collect = control_stream_manager.inject_barrier(
database_id,
Some(table_id),
mutation,
&barrier_info,
Expand Down Expand Up @@ -235,6 +236,7 @@ impl CreatingStreamingJobControl {
.on_new_upstream_epoch(barrier_info, start_consume_upstream)
{
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
self.info.table_fragments.table_id(),
control_stream_manager,
&mut self.barrier_control,
Expand Down Expand Up @@ -263,6 +265,7 @@ impl CreatingStreamingJobControl {
let table_id = self.info.table_fragments.table_id();
for info in prev_barriers_to_inject {
Self::inject_barrier(
DatabaseId::new(self.info.streaming_job.database_id()),
table_id,
control_stream_manager,
&mut self.barrier_control,
Expand Down
Loading

0 comments on commit 07a21a7

Please sign in to comment.