Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): support inject barrier to enable and disable writing old value for subscription #16460

Merged
merged 24 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
43c84c3
feat(storage): store old value in memtable
wenym1 Apr 22, 2024
eff6a5c
add delete only test epoch
wenym1 Apr 22, 2024
beb2823
Merge branch 'main' into yiming/old-value-memtable
wenym1 Apr 22, 2024
e989c03
fix
wenym1 Apr 22, 2024
461560c
maintain log store table id in command ctx
wenym1 Apr 23, 2024
e4529d5
Merge branch 'main' into yiming/old-value-memtable
wenym1 Apr 23, 2024
4c797aa
support truncate
wenym1 Apr 23, 2024
0a4824f
feat(meta): support inject barrier to enable and disable writing old …
wenym1 Apr 24, 2024
b6044fa
commit with log store info
wenym1 Apr 24, 2024
842e828
fix measure batch size
wenym1 Apr 24, 2024
5c9d3d8
fix test
wenym1 Apr 24, 2024
4ea21d9
Merge branch 'main' into yiming/old-value-memtable
wenym1 Apr 24, 2024
4af4cb8
Merge branch 'yiming/old-value-memtable' into yiming/subscription-bar…
wenym1 Apr 24, 2024
798877d
refactor(state-table): make consistent op an enum in state table
wenym1 Apr 24, 2024
5576d6f
add comment
wenym1 Apr 24, 2024
3dc9fdf
apply comment
wenym1 Apr 24, 2024
9038ae5
Merge branch 'yiming/state-table-op-consistency-enum' into yiming/sub…
wenym1 Apr 25, 2024
9ea5174
Merge branch 'main' into yiming/state-table-op-consistency-enum
wenym1 Apr 25, 2024
d13768d
Merge branch 'yiming/state-table-op-consistency-enum' into yiming/sub…
wenym1 Apr 25, 2024
3a9ce59
Merge branch 'main' into yiming/subscription-barrier
wenym1 Apr 26, 2024
03a5a18
Merge branch 'main' into yiming/subscription-barrier
wenym1 Apr 29, 2024
79ec559
refine
wenym1 Apr 29, 2024
015cf80
Merge branch 'main' into yiming/subscription-barrier
wenym1 Apr 30, 2024
6e447e9
add log to new command
wenym1 Apr 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ message CombinedMutation {
repeated BarrierMutation mutations = 1;
}

message CreateSubscriptionMutation {
uint32 subscription_id = 1;
uint32 upstream_mv_table_id = 2;
}

message DropSubscriptionMutation {
uint32 subscription_id = 1;
uint32 upstream_mv_table_id = 2;
}

message BarrierMutation {
oneof mutation {
// Add new dispatchers to some actors, used for creating materialized views.
Expand All @@ -111,6 +121,10 @@ message BarrierMutation {
ResumeMutation resume = 8;
// Throttle specific source exec or chain exec.
ThrottleMutation throttle = 10;
// Create subscription on mv
CreateSubscriptionMutation create_subscription = 11;
// Drop subscription on mv
DropSubscriptionMutation drop_subscription = 12;
// Combined mutation.
CombinedMutation combined = 100;
}
Expand Down
10 changes: 9 additions & 1 deletion proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@
option java_package = "com.risingwave.proto";
option optimize_for = SPEED;

message BuildActorInfo {
stream_plan.StreamActor actor = 1;
message SubscriptionIds {
repeated uint32 subscription_ids = 1;
}
map<uint32, SubscriptionIds> related_subscriptions = 2;
}

// Describe the fragments which will be running on this node
message UpdateActorsRequest {
string request_id = 1;
repeated stream_plan.StreamActor actors = 2;
repeated BuildActorInfo actors = 2;

Check failure on line 23 in proto/stream_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "2" on message "UpdateActorsRequest" changed type from "stream_plan.StreamActor" to "stream_service.BuildActorInfo".
}

message UpdateActorsResponse {
Expand Down
51 changes: 48 additions & 3 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use itertools::Itertools;
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
use risingwave_common::types::Timestamptz;
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::catalog::CreateType;
Expand All @@ -32,12 +34,13 @@ use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
use risingwave_pb::stream_plan::update_mutation::*;
use risingwave_pb::stream_plan::{
AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers, PauseMutation,
ResumeMutation, SourceChangeSplitMutation, StopMutation, StreamActor, ThrottleMutation,
UpdateMutation,
AddMutation, BarrierMutation, CombinedMutation, CreateSubscriptionMutation, Dispatcher,
Dispatchers, DropSubscriptionMutation, PauseMutation, ResumeMutation,
SourceChangeSplitMutation, StopMutation, StreamActor, ThrottleMutation, UpdateMutation,
};
use risingwave_pb::stream_service::WaitEpochCommitRequest;
use thiserror_ext::AsReport;
use tracing::warn;

use super::info::{ActorDesc, CommandActorChanges, InflightActorInfo};
use super::trace::TracedEpoch;
Expand Down Expand Up @@ -209,6 +212,17 @@ pub enum Command {
/// `Throttle` command generates a `Throttle` barrier with the given throttle config to change
/// the `rate_limit` of `FlowControl` Executor after `StreamScan` or Source.
Throttle(ThrottleConfig),

CreateSubscription {
wenym1 marked this conversation as resolved.
Show resolved Hide resolved
subscription_id: u32,
upstream_mv_table_id: TableId,
retention_second: u64,
},

DropSubscription {
subscription_id: u32,
upstream_mv_table_id: TableId,
},
}

impl Command {
Expand Down Expand Up @@ -293,6 +307,8 @@ impl Command {
Command::ReplaceTable(plan) => Some(plan.actor_changes()),
Command::SourceSplitAssignment(_) => None,
Command::Throttle(_) => None,
Command::CreateSubscription { .. } => None,
Command::DropSubscription { .. } => None,
}
}

Expand Down Expand Up @@ -671,6 +687,22 @@ impl CommandContext {
tracing::debug!("update mutation: {mutation:?}");
Some(mutation)
}

Command::CreateSubscription {
upstream_mv_table_id,
subscription_id,
..
} => Some(Mutation::CreateSubscription(CreateSubscriptionMutation {
upstream_mv_table_id: upstream_mv_table_id.table_id,
subscription_id: *subscription_id,
})),
Command::DropSubscription {
upstream_mv_table_id,
subscription_id,
} => Some(Mutation::DropSubscription(DropSubscriptionMutation {
upstream_mv_table_id: upstream_mv_table_id.table_id,
subscription_id: *subscription_id,
})),
};

mutation
Expand Down Expand Up @@ -1079,8 +1111,21 @@ impl CommandContext {
)
.await;
}

Command::CreateSubscription { .. } => {}
Command::DropSubscription { .. } => {}
}

Ok(())
}

pub fn get_truncate_epoch(&self, retention_second: u64) -> Epoch {
let Some(truncate_timestamptz) = Timestamptz::from_secs(
self.prev_epoch.value().as_timestamptz().timestamp() - retention_second as i64,
) else {
warn!(retention_second, prev_epoch = ?self.prev_epoch.value(), "invalid retention second value");
return self.prev_epoch.value();
};
Epoch::from_unix_millis(truncate_timestamptz.timestamp_millis() as u64)
}
}
54 changes: 49 additions & 5 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

use std::collections::{HashMap, HashSet};

use risingwave_common::catalog::TableId;
use risingwave_pb::common::PbWorkerNode;
use tracing::warn;

use crate::barrier::Command;
use crate::manager::{ActiveStreamingWorkerNodes, ActorInfos, WorkerId};
use crate::model::ActorId;

Expand Down Expand Up @@ -48,11 +50,18 @@ pub struct InflightActorInfo {

/// `actor_id` => `WorkerId`
pub actor_location_map: HashMap<ActorId, WorkerId>,

/// mv_table_id => subscription_id => retention seconds
pub mv_depended_subscriptions: HashMap<TableId, HashMap<u32, u64>>,
}

impl InflightActorInfo {
/// Resolve inflight actor info from given nodes and actors that are loaded from meta store. It will be used during recovery to rebuild all streaming actors.
pub fn resolve(active_nodes: &ActiveStreamingWorkerNodes, actor_infos: ActorInfos) -> Self {
pub fn resolve(
active_nodes: &ActiveStreamingWorkerNodes,
actor_infos: ActorInfos,
mv_depended_subscriptions: HashMap<TableId, HashMap<u32, u64>>,
) -> Self {
let node_map = active_nodes.current().clone();

let actor_map = actor_infos
Expand All @@ -77,6 +86,7 @@ impl InflightActorInfo {
actor_map,
actor_map_to_send,
actor_location_map,
mv_depended_subscriptions,
}
}

Expand All @@ -96,8 +106,8 @@ impl InflightActorInfo {

/// Apply some actor changes before issuing a barrier command, if the command contains any new added actors, we should update
/// the info correspondingly.
pub fn pre_apply(&mut self, changes: Option<CommandActorChanges>) {
if let Some(CommandActorChanges { to_add, .. }) = changes {
pub fn pre_apply(&mut self, command: &Command) {
if let Some(CommandActorChanges { to_add, .. }) = command.actor_changes() {
for actor_desc in to_add {
assert!(self.node_map.contains_key(&actor_desc.node_id));
assert!(
Expand All @@ -124,12 +134,27 @@ impl InflightActorInfo {
);
}
};
if let Command::CreateSubscription {
subscription_id,
upstream_mv_table_id,
retention_second,
} = command
{
if let Some(prev_retiontion) = self
.mv_depended_subscriptions
.entry(*upstream_mv_table_id)
.or_default()
.insert(*subscription_id, *retention_second)
{
warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, prev_retiontion, "add an existing subscription id");
}
}
}

/// Apply some actor changes after the barrier command is collected, if the command contains any actors that are dropped, we should
/// remove that from the snapshot correspondingly.
pub fn post_apply(&mut self, changes: Option<CommandActorChanges>) {
if let Some(CommandActorChanges { to_remove, .. }) = changes {
pub fn post_apply(&mut self, command: &Command) {
if let Some(CommandActorChanges { to_remove, .. }) = command.actor_changes() {
for actor_id in to_remove {
let node_id = self
.actor_location_map
Expand All @@ -145,6 +170,25 @@ impl InflightActorInfo {
self.actor_map_to_send
.retain(|_, actor_ids| !actor_ids.is_empty());
}
if let Command::DropSubscription {
subscription_id,
upstream_mv_table_id,
} = command
{
let removed = match self.mv_depended_subscriptions.get_mut(upstream_mv_table_id) {
Some(subscriptions) => {
let removed = subscriptions.remove(subscription_id).is_some();
if removed && subscriptions.is_empty() {
self.mv_depended_subscriptions.remove(upstream_mv_table_id);
}
removed
}
None => false,
};
if !removed {
warn!(subscription_id, ?upstream_mv_table_id, mv_depended_subscriptions = ?self.mv_depended_subscriptions, "remove a non-existing subscription id");
}
}
}

/// Returns actor list to collect in the target worker node.
Expand Down
23 changes: 18 additions & 5 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::assert_matches::assert_matches;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::future::pending;
use std::iter::empty;
use std::mem::{replace, take};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -1084,16 +1083,20 @@ impl GlobalBarrierManagerContext {
&self,
active_nodes: &ActiveStreamingWorkerNodes,
) -> MetaResult<InflightActorInfo> {
let subscriptions = self
.metadata_manager
.get_mv_depended_subscriptions()
.await?;
let info = match &self.metadata_manager {
MetadataManager::V1(mgr) => {
let all_actor_infos = mgr.fragment_manager.load_all_actors().await;

InflightActorInfo::resolve(active_nodes, all_actor_infos)
InflightActorInfo::resolve(active_nodes, all_actor_infos, subscriptions)
}
MetadataManager::V2(mgr) => {
let all_actor_infos = mgr.catalog_controller.load_all_actors().await?;

InflightActorInfo::resolve(active_nodes, all_actor_infos)
InflightActorInfo::resolve(active_nodes, all_actor_infos, subscriptions)
}
};

Expand Down Expand Up @@ -1187,8 +1190,18 @@ fn collect_commit_epoch_info(
old_value_ssts.into_iter(),
synced_ssts.iter().map(|sst| &sst.sst_info),
epochs,
// TODO: pass log store table id and the corresponding truncate_epoch
empty(),
command_ctx
.info
.mv_depended_subscriptions
.iter()
.filter_map(|(mv_table_id, subscriptions)| {
subscriptions.values().max().map(|max_retention| {
(
mv_table_id.table_id,
command_ctx.get_truncate_epoch(*max_retention).0,
)
})
}),
);

CommitEpochInfo::new(
Expand Down
11 changes: 6 additions & 5 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use itertools::Itertools;
use risingwave_common::hash::ActorId;
use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor};
use risingwave_pb::stream_plan::{Barrier, BarrierMutation};
use risingwave_pb::stream_service::{
streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse,
BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, InjectBarrierRequest,
StreamingControlStreamRequest, StreamingControlStreamResponse, UpdateActorsRequest,
BroadcastActorInfoTableRequest, BuildActorInfo, BuildActorsRequest, DropActorsRequest,
InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse,
UpdateActorsRequest,
};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::StreamClient;
Expand Down Expand Up @@ -427,7 +428,7 @@ impl StreamRpcManager {
worker_nodes: &HashMap<WorkerId, WorkerNode>,
broadcast_worker_ids: impl Iterator<Item = WorkerId>,
actor_infos_to_broadcast: impl Iterator<Item = ActorInfo>,
node_actors_to_create: impl Iterator<Item = (WorkerId, Vec<StreamActor>)>,
node_actors_to_create: impl Iterator<Item = (WorkerId, Vec<BuildActorInfo>)>,
) -> MetaResult<()> {
let actor_infos = actor_infos_to_broadcast.collect_vec();
let mut node_actors_to_create = node_actors_to_create.collect::<HashMap<_, _>>();
Expand All @@ -446,7 +447,7 @@ impl StreamRpcManager {
.await?;
if let Some(actors) = actors {
let request_id = Self::new_request_id();
let actor_ids = actors.iter().map(|actor| actor.actor_id).collect_vec();
let actor_ids = actors.iter().map(|actor| actor.actor.as_ref().unwrap().actor_id).collect_vec();
tracing::debug!(request_id = request_id.as_str(), actors = ?actor_ids, "update actors");
client
.update_actors(UpdateActorsRequest { request_id, actors })
Expand Down
5 changes: 2 additions & 3 deletions src/meta/src/barrier/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,9 @@ impl BarrierManagerState {
/// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
/// will be removed from the state after the info get resolved.
pub fn apply_command(&mut self, command: &Command) -> InflightActorInfo {
let changes = command.actor_changes();
self.inflight_actor_infos.pre_apply(changes.clone());
self.inflight_actor_infos.pre_apply(command);
let info = self.inflight_actor_infos.clone();
self.inflight_actor_infos.post_apply(changes);
self.inflight_actor_infos.post_apply(command);

info
}
Expand Down
13 changes: 10 additions & 3 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
use risingwave_pb::stream_plan::{
DispatchStrategy, Dispatcher, DispatcherType, FragmentTypeFlag, StreamActor, StreamNode,
};
use risingwave_pb::stream_service::BuildActorInfo;
use tokio::sync::{RwLock, RwLockReadGuard};

use crate::barrier::Reschedule;
Expand All @@ -43,7 +44,7 @@ use crate::model::{
TableParallelism, ValTransaction,
};
use crate::storage::Transaction;
use crate::stream::{SplitAssignment, TableRevision};
use crate::stream::{to_build_actor_info, SplitAssignment, TableRevision};
use crate::{MetaError, MetaResult};

pub struct FragmentManagerCore {
Expand Down Expand Up @@ -849,14 +850,20 @@ impl FragmentManager {
pub async fn all_node_actors(
&self,
include_inactive: bool,
) -> HashMap<WorkerId, Vec<StreamActor>> {
subscriptions: &HashMap<TableId, HashMap<u32, u64>>,
) -> HashMap<WorkerId, Vec<BuildActorInfo>> {
let mut actor_maps = HashMap::new();

let map = &self.core.read().await.table_fragments;
for fragments in map.values() {
let table_id = fragments.table_id();
for (node_id, actors) in fragments.worker_actors(include_inactive) {
let node_actors = actor_maps.entry(node_id).or_insert_with(Vec::new);
node_actors.extend(actors);
node_actors.extend(
actors
.into_iter()
.map(|actor| to_build_actor_info(actor, subscriptions, table_id)),
);
}
}

Expand Down
Loading
Loading