Skip to content

Commit

Permalink
remove
Browse files Browse the repository at this point in the history
fxi
  • Loading branch information
xxhZs committed May 6, 2024
1 parent 3b88872 commit de03bed
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 49 deletions.
4 changes: 2 additions & 2 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ message Sink {

message Subscription {
enum SubscriptionState {
INIT = 0;
CREATE = 1;
INIT = 1;
CREATE = 2;
}
uint32 id = 1;
string name = 2;
Expand Down
4 changes: 0 additions & 4 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,8 @@ message BroadcastActorInfoTableRequest {

// Create channels and gRPC connections for a fragment
message BuildActorsRequest {
message SubscriptionIds {
repeated uint32 subscription_ids = 1;
}
string request_id = 1;
repeated uint32 actor_id = 2;
map<uint32, SubscriptionIds> related_subscriptions = 3;
}

message BuildActorsResponse {
Expand Down
5 changes: 0 additions & 5 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -966,11 +966,6 @@ impl PlanRoot {
)
}

/// Set the plan root's required dist.
pub fn set_required_dist(&mut self, required_dist: RequiredDist) {
self.required_dist = required_dist;
}

pub fn should_use_arrangement_backfill(&self) -> bool {
let ctx = self.plan.ctx();
let session_ctx = ctx.session_ctx();
Expand Down
1 change: 0 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,6 @@ impl GlobalBarrierManagerContext {
let actors = actors.iter().cloned().collect();
(*node_id, actors)
}),
&info.mv_depended_subscriptions,
)
.await?;

Expand Down
38 changes: 9 additions & 29 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@ use futures::future::try_join_all;
use futures::stream::{BoxStream, FuturesUnordered};
use futures::{pin_mut, FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorId;
use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::stream_plan::{Barrier, BarrierMutation};
use risingwave_pb::stream_service::build_actors_request::SubscriptionIds;
use risingwave_pb::stream_service::{
streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse,
BroadcastActorInfoTableRequest, BuildActorInfo, BuildActorsRequest, DropActorsRequest,
Expand Down Expand Up @@ -405,36 +403,18 @@ impl StreamRpcManager {
&self,
node_map: &HashMap<WorkerId, WorkerNode>,
node_actors: impl Iterator<Item = (WorkerId, Vec<ActorId>)>,
related_subscriptions: &HashMap<TableId, HashMap<u32, u64>>,
) -> MetaResult<()> {
self.make_request(
node_actors.map(|(worker_id, actors)| (node_map.get(&worker_id).unwrap(), actors)),
|client, actors| {
let related_subscriptions = related_subscriptions.clone();
async move {
let request_id = Self::new_request_id();
tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "build actors");
client
.build_actors(BuildActorsRequest {
request_id,
actor_id: actors,
related_subscriptions: related_subscriptions
.iter()
.map(|(table_id, subscriptions)| {
(
table_id.table_id,
SubscriptionIds {
subscription_ids: subscriptions
.keys()
.cloned()
.collect(),
},
)
})
.collect(),
})
.await
}
|client, actors| async move {
let request_id = Self::new_request_id();
tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "build actors");
client
.build_actors(BuildActorsRequest {
request_id,
actor_id: actors,
})
.await
},
)
.await?;
Expand Down
4 changes: 0 additions & 4 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,10 +852,6 @@ impl ScaleController {
.collect_vec(),
)
}),
&self
.metadata_manager
.get_mv_depended_subscriptions()
.await?,
)
.await?;

Expand Down
4 changes: 0 additions & 4 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,6 @@ impl GlobalStreamManager {
.build_actors(
&building_locations.worker_locations,
building_worker_actors.into_iter(),
&self
.metadata_manager
.get_mv_depended_subscriptions()
.await?,
)
.await?;

Expand Down

0 comments on commit de03bed

Please sign in to comment.