Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): support inject barrier to enable and disable writing old value for subscription #16460

Merged
merged 24 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
43c84c3
feat(storage): store old value in memtable
wenym1 Apr 22, 2024
eff6a5c
add delete only test epoch
wenym1 Apr 22, 2024
beb2823
Merge branch 'main' into yiming/old-value-memtable
wenym1 Apr 22, 2024
e989c03
fix
wenym1 Apr 22, 2024
461560c
maintain log store table id in command ctx
wenym1 Apr 23, 2024
e4529d5
Merge branch 'main' into yiming/old-value-memtable
wenym1 Apr 23, 2024
4c797aa
support truncate
wenym1 Apr 23, 2024
0a4824f
feat(meta): support inject barrier to enable and disable writing old …
wenym1 Apr 24, 2024
b6044fa
commit with log store info
wenym1 Apr 24, 2024
842e828
fix measure batch size
wenym1 Apr 24, 2024
5c9d3d8
fix test
wenym1 Apr 24, 2024
4ea21d9
Merge branch 'main' into yiming/old-value-memtable
wenym1 Apr 24, 2024
4af4cb8
Merge branch 'yiming/old-value-memtable' into yiming/subscription-bar…
wenym1 Apr 24, 2024
798877d
refactor(state-table): make consistent op an enum in state table
wenym1 Apr 24, 2024
5576d6f
add comment
wenym1 Apr 24, 2024
3dc9fdf
apply comment
wenym1 Apr 24, 2024
9038ae5
Merge branch 'yiming/state-table-op-consistency-enum' into yiming/sub…
wenym1 Apr 25, 2024
9ea5174
Merge branch 'main' into yiming/state-table-op-consistency-enum
wenym1 Apr 25, 2024
d13768d
Merge branch 'yiming/state-table-op-consistency-enum' into yiming/sub…
wenym1 Apr 25, 2024
3a9ce59
Merge branch 'main' into yiming/subscription-barrier
wenym1 Apr 26, 2024
03a5a18
Merge branch 'main' into yiming/subscription-barrier
wenym1 Apr 29, 2024
79ec559
refine
wenym1 Apr 29, 2024
015cf80
Merge branch 'main' into yiming/subscription-barrier
wenym1 Apr 30, 2024
6e447e9
add log to new command
wenym1 Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ message CombinedMutation {
repeated BarrierMutation mutations = 1;
}

message CreateSubscriptionMutation {
uint32 subscription_id = 1;
uint32 upstream_mv_table_id = 2;
}

message DropSubscriptionMutation {
uint32 subscription_id = 1;
uint32 upstream_mv_table_id = 2;
}

message BarrierMutation {
oneof mutation {
// Add new dispatchers to some actors, used for creating materialized views.
Expand All @@ -111,6 +121,10 @@ message BarrierMutation {
ResumeMutation resume = 8;
// Throttle specific source exec or chain exec.
ThrottleMutation throttle = 10;
// Create subscription on mv
CreateSubscriptionMutation create_subscription = 11;
// Drop subscription on mv
DropSubscriptionMutation drop_subscription = 12;
// Combined mutation.
CombinedMutation combined = 100;
}
Expand Down
4 changes: 4 additions & 0 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ message BroadcastActorInfoTableRequest {

// Create channels and gRPC connections for a fragment
message BuildActorsRequest {
message SubscriptionIds {
repeated uint32 subscription_ids = 1;
}
string request_id = 1;
repeated uint32 actor_id = 2;
map<uint32, SubscriptionIds> related_subscriptions = 3;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Placing such a field here (and all the related code path) looks quite ad-hoc to me... Why isn't it a field in stream_plan::StreamActor?

A related question: how is this information persisted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just use the subscription information persisted in catalog.

I used to think about adding it as a field of StreamActor. But since StreamActor is also persisted, we will have to carefully maintain the consistency between the subscription information persisted in catalog and the StreamActor. However, such information in StreamActor is redundant, and the effort to carefully maintain the consistency is unnecessary.

So at the end I chose the current implementation.

Plus: I think the dispatcher information is also unnecessary to be persisted in StreamActor, since, IIUC, the dispatcher information can be recalculated when we know the upstream and downstream distribution, and therefore the persisted dispatcher information is also unnecessary, and we will also have to carefully maintain the consistency.

Copy link
Member

@fuyufjh fuyufjh Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just use the subscription information persisted in catalog.

This sounds ok to me.

My concern is on this field related_subscriptions. Basically, the BuildActorsRequest RPC and all the subsequent calls only pass actor_ids, while the additional information such as the definition of the actors are stored in metadata and collected later. To keep the code consistent and clean, I think here we should also follow this style.

Plus:

I think the dispatcher information is also unnecessary to be persisted in StreamActor

The actors need to know exactly which downstream actors it should connect to. Imagine that when one CN fails and restarts, the actors on it will be recreated while the rest CN doesn't need to do so. Is such information preserve somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actors need to know exactly which downstream actors it should connect to. Imagine that when one CN fails and restarts, the actors on it will be recreated while the rest CN doesn't need to do so. Is such information preserve somewhere?

IIUC, the dispatch information can be calculated as long as we have the vnode distribution of upstream and downstream. For example, the upstream actor owns vnode [1, 2, 3], and then if we know the downstream actor A owns vnode1, B owns vnode2 and C owns vnode3, and then we will know that the upstream actor should dispatch to downstream actor A, B and C.

Currently, when a CN fails, a recovery will be triggered, and all actors will be rebuilt. Meta node should have the persisted vnode distribution information of all actors, and then the dispatcher information can be calculated as well, and then the actor can know which actors to connect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, the upstream actor owns vnode [1, 2, 3], and then if we know the downstream actor A owns vnode1, B owns vnode2 and C owns vnode3, and then we will know that the upstream actor should dispatch to downstream actor A, B and C.

I see. You mean the vnode_bitmap in the downstream actors, which defines the data distribution of all state tables in that fragment/actors. Technically, it seems possible to have different distribution key in shuffle and state table, even though I believe in our system they are always same. 🤔

}

message BuildActorsResponse {
Expand Down
17 changes: 16 additions & 1 deletion src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use await_tree::InstrumentAwait;
use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_common::catalog::TableId;
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
Expand Down Expand Up @@ -65,7 +66,21 @@ impl StreamService for StreamServiceImpl {
let req = request.into_inner();

let actor_id = req.actor_id;
let res = self.mgr.build_actors(actor_id).await;
let res = self
.mgr
.build_actors(
actor_id,
req.related_subscriptions
.into_iter()
.map(|(table_id, subscriptions)| {
(
TableId::new(table_id),
subscriptions.subscription_ids.into_iter().collect(),
)
})
.collect(),
)
.await;
match res {
Err(e) => {
error!(error = %e.as_report(), "failed to build actors");
Expand Down
51 changes: 48 additions & 3 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use itertools::Itertools;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
use risingwave_common::types::Timestamptz;
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::catalog::CreateType;
Expand All @@ -32,12 +34,13 @@ use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
use risingwave_pb::stream_plan::update_mutation::*;
use risingwave_pb::stream_plan::{
AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers, PauseMutation,
ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamActor, ThrottleMutation,
UpdateMutation,
AddMutation, BarrierMutation, CombinedMutation, CreateSubscriptionMutation, Dispatcher,
Dispatchers, DropSubscriptionMutation, PauseMutation, ResumeMutation,
SourceChangeSplitMutation, StopMutation, StreamActor, ThrottleMutation, UpdateMutation,
};
use risingwave_pb::stream_service::WaitEpochCommitRequest;
use thiserror_ext::AsReport;
use tracing::warn;

use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo};
use super::trace::TracedEpoch;
Expand Down Expand Up @@ -201,6 +204,17 @@ pub enum Command {
/// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
/// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
Throttle(ThrottleConfig),

CreateSubscription {
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
subscription_id: u32,
upstream_mv_table_id: TableId,
retention_second: u64,
},

DropSubscription {
subscription_id: u32,
upstream_mv_table_id: TableId,
},
}

impl Command {
Expand Down Expand Up @@ -285,6 +299,8 @@ impl Command {
Command::ReplaceTable(plan) => Some(plan.actor_changes()),
Command::SourceSplitAssignment(_) => None,
Command::Throttle(_) => None,
Command::CreateSubscription { .. } => None,
Command::DropSubscription { .. } => None,
}
}

Expand Down Expand Up @@ -663,6 +679,22 @@ impl CommandContext {
tracing::debug!("update mutation: {mutation:?}");
Some(mutation)
}

Command::CreateSubscription {
upstream_mv_table_id,
subscription_id,
..
} => Some(Mutation::CreateSubscription(CreateSubscriptionMutation {
upstream_mv_table_id: upstream_mv_table_id.table_id,
subscription_id: *subscription_id,
})),
Command::DropSubscription {
upstream_mv_table_id,
subscription_id,
} => Some(Mutation::DropSubscription(DropSubscriptionMutation {
upstream_mv_table_id: upstream_mv_table_id.table_id,
subscription_id: *subscription_id,
})),
};

mutation
Expand Down Expand Up @@ -1071,8 +1103,21 @@ impl CommandContext {
)
.await;
}

Command::CreateSubscription { .. } => {}
Command::DropSubscription { .. } => {}
}

Ok(())
}

pub fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
let Some(truncate_timestamptz) = Timestamptz::from_secs(
self.prev_epoch.value().as_timestamptz().timestamp() - retention_second as i64,
) else {
warn!(retention_second, prev_epoch = ?self.prev_epoch.value(), "invalid retention second value");
return self.prev_epoch.value();
};
Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
}
}
54 changes: 49 additions & 5 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

use std::collections::{HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_pb::common::PbWorkerNode;
use tracing::warn;

use crate::barrier::Command;
use crate::manager::{ActiveStreamingWorkerNodes, ActorInfos, WorkerId};
use crate::model::ActorId;

Expand Down Expand Up @@ -48,11 +50,18 @@ pub struct InflightActorInfo {

/// `actor_id` => `WorkerId`
pub actor_location_map: HashMap<ActorId, WorkerId>,

/// mv_table_id => subscription_id => retention seconds
pub mv_depended_subscriptions: HashMap<TableId, HashMap<u32, u64>>,
}

impl InflightActorInfo {
/// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors.
pub fn resolve(active_nodes: &ActiveStreamingWorkerNodes, actor_infos: ActorInfos) -> Self {
pub fn resolve(
active_nodes: &ActiveStreamingWorkerNodes,
actor_infos: ActorInfos,
mv_depended_subscriptions: HashMap<TableId, HashMap<u32, u64>>,
) -> Self {
let node_map = active_nodes.current().clone();

let actor_map = actor_infos
Expand All @@ -77,6 +86,7 @@ impl InflightActorInfo {
actor_map,
actor_map_to_send,
actor_location_map,
mv_depended_subscriptions,
}
}

Expand All @@ -96,8 +106,8 @@ impl InflightActorInfo {

/// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
/// the info correspondingly.
pub fn pre_apply(&mut self, changes: Option<CommandActorChanges>) {
if let Some(CommandActorChanges { to_add, .. }) = changes {
pub fn pre_apply(&mut self, command: &Command) {
if let Some(CommandActorChanges { to_add, .. }) = command.actor_changes() {
for actor_desc in to_add {
assert!(self.node_map.contains_key(&actor_desc.node_id));
assert!(
Expand All @@ -124,12 +134,27 @@ impl InflightActorInfo {
);
}
};
if let Command::CreateSubscription {
subscription_id,
upstream_mv_table_id,
retention_second,
} = command
{
if let Some(prev_retiontion) = self
.mv_depended_subscriptions
.entry(*upstream_mv_table_id)
.or_default()
.insert(*subscription_id, *retention_second)
{
warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
}
}
}

/// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
/// remove that from the snapshot correspondingly.
pub fn post_apply(&mut self, changes: Option<CommandActorChanges>) {
if let Some(CommandActorChanges { to_remove, .. }) = changes {
pub fn post_apply(&mut self, command: &Command) {
if let Some(CommandActorChanges { to_remove, .. }) = command.actor_changes() {
for actor_id in to_remove {
let node_id = self
.actor_location_map
Expand All @@ -145,6 +170,25 @@ impl InflightActorInfo {
self.actor_map_to_send
.retain(|_, actor_ids| !actor_ids.is_empty());
}
if let Command::DropSubscription {
subscription_id,
upstream_mv_table_id,
} = command
{
let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
Some(subscriptions) => {
let removed = subscriptions.remove(subscription_id).is_some();
if removed && subscriptions.is_empty() {
self.mv_depended_subscriptions.remove(upstream_mv_table_id);
}
removed
}
None => false,
};
if !removed {
warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
}
}
}

/// Returns actor list to collect in the target worker node.
Expand Down
23 changes: 18 additions & 5 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::assert_matches::assert_matches;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::future::pending;
use std::iter::empty;
use std::mem::{replace, take};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -1084,16 +1083,20 @@ impl GlobalBarrierManagerContext {
&self,
active_nodes: &ActiveStreamingWorkerNodes,
) -> MetaResult<InflightActorInfo> {
let subscriptions = self
.metadata_manager
.get_mv_depended_subscriptions()
.await?;
let info = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let all_actor_infos = mgr.fragment_manager.load_all_actors().await;

InflightActorInfo::resolve(active_nodes, all_actor_infos)
InflightActorInfo::resolve(active_nodes, all_actor_infos, subscriptions)
}
MetadataManager::V2(mgr) => {
let all_actor_infos = mgr.catalog_controller.load_all_actors().await?;

InflightActorInfo::resolve(active_nodes, all_actor_infos)
InflightActorInfo::resolve(active_nodes, all_actor_infos, subscriptions)
}
};

Expand Down Expand Up @@ -1187,8 +1190,18 @@ fn collect_commit_epoch_info(
old_value_ssts.into_iter(),
synced_ssts.iter().map(|sst| &sst.sst_info),
epochs,
// TODO: pass log store table id and the corresponding truncate_epoch
empty(),
command_ctx
.info
.mv_depended_subscriptions
.iter()
.filter_map(|(mv_table_id, subscriptions)| {
subscriptions.values().max().map(|max_retention| {
(
mv_table_id.table_id,
command_ctx.get_truncate_epoch(*max_retention).0,
)
})
}),
);

CommitEpochInfo::new(
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,7 @@ impl GlobalBarrierManagerContext {
let actors = actors.iter().cloned().collect();
(*node_id, actors)
}),
&info.mv_depended_subscriptions,
)
.await?;

Expand Down
38 changes: 29 additions & 9 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ use futures::future::try_join_all;
use futures::stream::{BoxStream, FuturesUnordered};
use futures::{pin_mut, FutureExt, StreamExt};
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorId;
use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor};
use risingwave_pb::stream_service::build_actors_request::SubscriptionIds;
use risingwave_pb::stream_service::{
streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse,
BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, InjectBarrierRequest,
Expand Down Expand Up @@ -402,18 +404,36 @@ impl StreamRpcManager {
&self,
node_map: &HashMap<WorkerId, WorkerNode>,
node_actors: impl Iterator<Item = (WorkerId, Vec<ActorId>)>,
related_subscriptions: &HashMap<TableId, HashMap<u32, u64>>,
) -> MetaResult<()> {
self.make_request(
node_actors.map(|(worker_id, actors)| (node_map.get(&worker_id).unwrap(), actors)),
|client, actors| async move {
let request_id = Self::new_request_id();
tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "build actors");
client
.build_actors(BuildActorsRequest {
request_id,
actor_id: actors,
})
.await
|client, actors| {
let related_subscriptions = related_subscriptions.clone();
async move {
let request_id = Self::new_request_id();
tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "build actors");
client
.build_actors(BuildActorsRequest {
request_id,
actor_id: actors,
related_subscriptions: related_subscriptions
.iter()
.map(|(table_id, subscriptions)| {
(
table_id.table_id,
SubscriptionIds {
subscription_ids: subscriptions
.keys()
.cloned()
.collect(),
},
)
})
.collect(),
})
.await
}
},
)
.await?;
Expand Down
Loading
Loading