Skip to content

Commit

Permalink
add func
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Jan 23, 2024
1 parent 5b888e9 commit df1bff2
Show file tree
Hide file tree
Showing 8 changed files with 452 additions and 70 deletions.
1 change: 1 addition & 0 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ pub async fn rpc_serve(
Some(backend) => {
let mut options = sea_orm::ConnectOptions::new(backend.endpoint);
options
.sqlx_logging(false)
.max_connections(20)
.connect_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(30));
Expand Down
33 changes: 17 additions & 16 deletions src/meta/service/src/scale_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl ScaleServiceImpl {
async fn get_revision(&self) -> TableRevision {
match &self.metadata_manager {
MetadataManager::V1(mgr) => mgr.fragment_manager.get_revision().await,
MetadataManager::V2(_) => unimplemented!("support table revision in v2"),
MetadataManager::V2(_) => Default::default(),
}
}
}
Expand Down Expand Up @@ -141,10 +141,6 @@ impl ScaleService for ScaleServiceImpl {
) -> Result<Response<RescheduleResponse>, Status> {
self.barrier_manager.check_status_running().await?;

let MetadataManager::V1(mgr) = &self.metadata_manager else {
unimplemented!("only available in v1");
};

let RescheduleRequest {
reschedules,
revision,
Expand All @@ -163,19 +159,24 @@ impl ScaleService for ScaleServiceImpl {
}

let table_parallelisms = {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

let mut table_parallelisms = HashMap::new();
for (table_id, table) in guard.table_fragments() {
if table
.fragment_ids()
.any(|fragment_id| reschedules.contains_key(&fragment_id))
{
table_parallelisms.insert(*table_id, TableParallelism::Custom);
match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let guard = mgr.fragment_manager.get_fragment_read_guard().await;

let mut table_parallelisms = HashMap::new();
for (table_id, table) in guard.table_fragments() {
if table
.fragment_ids()
.any(|fragment_id| reschedules.contains_key(&fragment_id))
{
table_parallelisms.insert(*table_id, TableParallelism::Custom);
}
}

table_parallelisms
}
MetadataManager::V2(_) => HashMap::new(),
}

table_parallelisms
};

self.stream_manager
Expand Down
24 changes: 14 additions & 10 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::meta::table_fragments::PbActorStatus;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
use risingwave_pb::stream_plan::barrier::BarrierKind;
Expand All @@ -31,8 +32,8 @@ use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
use risingwave_pb::stream_plan::update_mutation::*;
use risingwave_pb::stream_plan::{
AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers, FragmentTypeFlag,
PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, ThrottleMutation,
UpdateMutation,
PauseMutation, ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamActor,
ThrottleMutation, UpdateMutation,
};
use risingwave_pb::stream_service::{DropActorsRequest, WaitEpochCommitRequest};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -76,6 +77,8 @@ pub struct Reschedule {
/// Whether this fragment is injectable. The injectable means whether the fragment contains
/// any executors that are able to receive barrier.
pub injectable: bool,

pub newly_created_actors: Vec<(StreamActor, PbActorStatus)>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -479,18 +482,20 @@ impl CommandContext {
),

Command::RescheduleFragment { reschedules, .. } => {
let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
else {
unimplemented!("implement scale functions in v2");
};
// let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager
// else {
// unimplemented!("implement scale functions in v2");
// };

let metadata_manager = &self.barrier_manager_context.metadata_manager;

let mut dispatcher_update = HashMap::new();
for reschedule in reschedules.values() {
for &(upstream_fragment_id, dispatcher_id) in
&reschedule.upstream_fragment_dispatcher_ids
{
// Find the actors of the upstream fragment.
let upstream_actor_ids = mgr
.fragment_manager
let upstream_actor_ids = metadata_manager
.get_running_actors_of_fragment(upstream_fragment_id)
.await?;

Expand Down Expand Up @@ -528,8 +533,7 @@ impl CommandContext {
for (&fragment_id, reschedule) in reschedules {
for &downstream_fragment_id in &reschedule.downstream_fragment_ids {
// Find the actors of the downstream fragment.
let downstream_actor_ids = mgr
.fragment_manager
let downstream_actor_ids = metadata_manager
.get_running_actors_of_fragment(downstream_fragment_id)
.await?;

Expand Down
16 changes: 8 additions & 8 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use thiserror_ext::AsReport;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time;
use tracing::Instrument;

use self::command::CommandContext;
Expand Down Expand Up @@ -394,14 +395,12 @@ impl GlobalBarrierManager {

let tracker = CreateMviewProgressTracker::new();

let scale_controller = match &metadata_manager {
MetadataManager::V1(_) => Some(Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
env.clone(),
))),
MetadataManager::V2(_) => None,
};
let scale_controller = Some(Arc::new(ScaleController::new(
&metadata_manager,
source_manager.clone(),
env.clone(),
)));

let context = GlobalBarrierManagerContext {
status: Arc::new(Mutex::new(BarrierManagerStatus::Starting)),
metadata_manager,
Expand Down Expand Up @@ -742,6 +741,7 @@ impl GlobalBarrierManager {
.await;
self.context.set_status(BarrierManagerStatus::Running).await;
} else {
time::sleep(Duration::from_secs(10)).await;
panic!("failed to execute barrier: {}", err.as_report());
}
}
Expand Down
Loading

0 comments on commit df1bff2

Please sign in to comment.