diff --git a/e2e_test/batch/transaction/read_only_multi_conn.slt b/e2e_test/batch/transaction/read_only_multi_conn.slt index c19ef54b0b619..2e51c2d4e8fce 100644 --- a/e2e_test/batch/transaction/read_only_multi_conn.slt +++ b/e2e_test/batch/transaction/read_only_multi_conn.slt @@ -7,9 +7,15 @@ insert into t values (1), (2); statement ok flush; +connection txn +statement ok +SET VISIBILITY_MODE TO checkpoint; + +connection txn statement ok start transaction read only; +connection txn query I select count(*) from t; ---- @@ -42,24 +48,29 @@ select count(*) from t; 3 # ...but not in the read-only transaction +connection txn query I select count(*) from t; ---- 2 +connection txn statement ok flush; # still invisible even after flush +connection txn query I select count(*) from t; ---- 2 +connection txn statement ok commit; # now visible outside the transaction +connection txn query I select count(*) from t; ---- diff --git a/proto/hummock.proto b/proto/hummock.proto index 9015cb44d42a3..19b7e036c9686 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -239,7 +239,8 @@ message HummockSnapshot { // Epoch with checkpoint, we will read durable data with it. uint64 committed_epoch = 1; // Epoch without checkpoint, we will read real-time data with it. But it may be rolled back. - uint64 current_epoch = 2; + reserved 2; + reserved "current_epoch"; } message VersionUpdatePayload { diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 419a61f113edf..54ffc3d5ff79c 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -17,31 +17,6 @@ message BuildActorInfo { map related_subscriptions = 2; } -// Describe the fragments which will be running on this node -message UpdateActorsRequest { - string request_id = 1; - repeated BuildActorInfo actors = 2; -} - -message UpdateActorsResponse { - common.Status status = 1; -} - -message BroadcastActorInfoTableRequest { - repeated common.ActorInfo info = 1; -} - -// Create channels and gRPC connections for a fragment -message BuildActorsRequest { - string request_id = 1; - repeated uint32 actor_id = 2; -} - -message BuildActorsResponse { - string request_id = 1; - common.Status status = 2; -} - message DropActorsRequest { string request_id = 1; repeated uint32 actor_ids = 2; @@ -68,6 +43,9 @@ message InjectBarrierRequest { // we specify the set of snapshot backfill actors that needs to be pre-synced with the upstream barrier mutation, // so that the input executor won't be blocked at waiting for the mutation of upstream barriers. repeated uint32 actor_ids_to_pre_sync_barrier_mutation = 7; + + repeated common.ActorInfo broadcast_info = 8; + repeated BuildActorInfo actors_to_build = 9; } message BarrierCompleteResponse { @@ -95,11 +73,6 @@ message BarrierCompleteResponse { uint64 epoch = 9; } -// Before starting streaming, the leader node broadcast the actor-host table to needed workers. -message BroadcastActorInfoTableResponse { - common.Status status = 1; -} - message WaitEpochCommitRequest { uint64 epoch = 1; } @@ -136,9 +109,6 @@ message StreamingControlStreamResponse { } service StreamService { - rpc UpdateActors(UpdateActorsRequest) returns (UpdateActorsResponse); - rpc BuildActors(BuildActorsRequest) returns (BuildActorsResponse); - rpc BroadcastActorInfoTable(BroadcastActorInfoTableRequest) returns (BroadcastActorInfoTableResponse); rpc DropActors(DropActorsRequest) returns (DropActorsResponse); rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse); rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse); diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index 2222f09e45f3d..eb055a174b3ea 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -19,7 +19,6 @@ use risingwave_pb::stream_service::*; use risingwave_storage::dispatch_state_store; use risingwave_stream::error::StreamError; use risingwave_stream::task::{LocalStreamManager, StreamEnvironment}; -use thiserror_ext::AsReport; use tokio::sync::mpsc::unbounded_channel; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{Request, Response, Status, Streaming}; @@ -41,62 +40,6 @@ impl StreamService for StreamServiceImpl { type StreamingControlStreamStream = impl Stream>; - #[cfg_attr(coverage, coverage(off))] - async fn update_actors( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - let res = self.mgr.update_actors(req.actors).await; - match res { - Err(e) => { - error!(error = %e.as_report(), "failed to update stream actor"); - Err(e.into()) - } - Ok(()) => Ok(Response::new(UpdateActorsResponse { status: None })), - } - } - - #[cfg_attr(coverage, coverage(off))] - async fn build_actors( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - - let actor_id = req.actor_id; - let res = self.mgr.build_actors(actor_id).await; - match res { - Err(e) => { - error!(error = %e.as_report(), "failed to build actors"); - Err(e.into()) - } - Ok(()) => Ok(Response::new(BuildActorsResponse { - request_id: req.request_id, - status: None, - })), - } - } - - #[cfg_attr(coverage, coverage(off))] - async fn broadcast_actor_info_table( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - - let res = self.mgr.update_actor_info(req.info).await; - match res { - Err(e) => { - error!(error = %e.as_report(), "failed to update actor info table actor"); - Err(e.into()) - } - Ok(()) => Ok(Response::new(BroadcastActorInfoTableResponse { - status: None, - })), - } - } - #[cfg_attr(coverage, coverage(off))] async fn drop_actors( &self, diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index ad2b4044b3c3b..68f20fcf7d581 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -61,10 +61,6 @@ const fn _default_retry_backoff() -> Duration { Duration::from_millis(100) } -const fn _default_message_timeout_ms() -> usize { - 5000 -} - const fn _default_max_in_flight_requests_per_connection() -> usize { 5 } @@ -150,12 +146,9 @@ pub struct RdKafkaPropertiesProducer { /// Produce message timeout. /// This value is used to limits the time a produced message waits for /// successful delivery (including retries). - #[serde( - rename = "properties.message.timeout.ms", - default = "_default_message_timeout_ms" - )] - #[serde_as(as = "DisplayFromStr")] - message_timeout_ms: usize, + #[serde(rename = "properties.message.timeout.ms")] + #[serde_as(as = "Option")] + message_timeout_ms: Option, /// The maximum number of unacknowledged requests the client will send on a single connection before blocking. #[serde( @@ -205,7 +198,9 @@ impl RdKafkaPropertiesProducer { if let Some(v) = self.request_required_acks { c.set("request.required.acks", v.to_string()); } - c.set("message.timeout.ms", self.message_timeout_ms.to_string()); + if let Some(v) = self.message_timeout_ms { + c.set("message.timeout.ms", v.to_string()); + } c.set( "max.in.flight.requests.per.connection", self.max_in_flight_requests_per_connection.to_string(), @@ -626,7 +621,10 @@ mod test { c.rdkafka_properties_producer.compression_codec, Some(CompressionCodec::Zstd) ); - assert_eq!(c.rdkafka_properties_producer.message_timeout_ms, 114514); + assert_eq!( + c.rdkafka_properties_producer.message_timeout_ms, + Some(114514) + ); assert_eq!( c.rdkafka_properties_producer .max_in_flight_requests_per_connection, diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 9d704d31942e9..f321de880c72c 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -531,7 +531,6 @@ KafkaConfig: This value is used to limits the time a produced message waits for successful delivery (including retries). required: false - default: '5000' - name: properties.max.in.flight.requests.per.connection field_type: usize comments: The maximum number of unacknowledged requests the client will send on a single connection before blocking. diff --git a/src/frontend/src/handler/create_table.rs b/src/frontend/src/handler/create_table.rs index 825ee595b4b29..0359280d28adc 100644 --- a/src/frontend/src/handler/create_table.rs +++ b/src/frontend/src/handler/create_table.rs @@ -56,7 +56,7 @@ use crate::catalog::source_catalog::SourceCatalog; use crate::catalog::table_catalog::TableVersion; use crate::catalog::{check_valid_column_name, ColumnId, DatabaseId, SchemaId}; use crate::error::{ErrorCode, Result, RwError}; -use crate::expr::{Expr, ExprImpl, ExprRewriter, InlineNowProcTime}; +use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::handler::create_source::{ bind_columns_from_source, bind_connector_props, bind_create_source_or_table_with_connector, bind_source_watermark, handle_addition_columns, UPSTREAM_SOURCE_KEY, @@ -329,9 +329,10 @@ pub fn bind_sql_column_constraints( // so the rewritten expression should almost always be pure and we directly call `fold_const` // here. Actually we do not require purity of the expression here since we're only to get a // snapshot value. - let rewritten_expr_impl = - InlineNowProcTime::new(session.pinned_snapshot().epoch()) - .rewrite_expr(expr_impl.clone()); + let rewritten_expr_impl = session + .pinned_snapshot() + .inline_now_proc_time() + .rewrite_expr(expr_impl.clone()); if let Some(snapshot_value) = rewritten_expr_impl.try_fold_const() { let snapshot_value = snapshot_value?; diff --git a/src/frontend/src/optimizer/logical_optimization.rs b/src/frontend/src/optimizer/logical_optimization.rs index de9db6f8f22d2..93da63cf70a42 100644 --- a/src/frontend/src/optimizer/logical_optimization.rs +++ b/src/frontend/src/optimizer/logical_optimization.rs @@ -18,7 +18,7 @@ use risingwave_common::bail; use super::plan_node::RewriteExprsRecursive; use super::plan_visitor::has_logical_max_one_row; use crate::error::Result; -use crate::expr::{InlineNowProcTime, NowProcTimeFinder}; +use crate::expr::NowProcTimeFinder; use crate::optimizer::heuristic_optimizer::{ApplyOrder, HeuristicOptimizer}; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, VisitExprsRecursive, @@ -540,8 +540,7 @@ impl LogicalOptimizer { return plan; } - let epoch = ctx.session_ctx().pinned_snapshot().epoch(); - let mut v = InlineNowProcTime::new(epoch); + let mut v = ctx.session_ctx().pinned_snapshot().inline_now_proc_time(); let plan = plan.rewrite_exprs_recursive(&mut v); diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index 73a3ade5799b5..4eb23e97dd5f7 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -25,6 +25,7 @@ use thiserror_ext::AsReport; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::watch; +use crate::expr::InlineNowProcTime; use crate::meta_client::FrontendMetaClient; /// The interval between two unpin batches. @@ -60,23 +61,12 @@ impl ReadSnapshot { } } - /// Get the [`Option`] value for this snapshot, only `FrontendPinned`. - pub fn epoch_with_frontend_pinned(&self) -> Option { - match self.batch_query_epoch().epoch.unwrap() { - batch_query_epoch::Epoch::Committed(epoch) - | batch_query_epoch::Epoch::Current(epoch) => Some(epoch.into()), - batch_query_epoch::Epoch::Backup(_) | batch_query_epoch::Epoch::TimeTravel(_) => None, - } - } - - /// Get the [`Epoch`] value for this snapshot. - pub fn epoch(&self) -> Epoch { - match self.batch_query_epoch().epoch.unwrap() { - batch_query_epoch::Epoch::Committed(epoch) - | batch_query_epoch::Epoch::Current(epoch) - | batch_query_epoch::Epoch::Backup(epoch) - | batch_query_epoch::Epoch::TimeTravel(epoch) => epoch.into(), - } + pub fn inline_now_proc_time(&self) -> InlineNowProcTime { + let epoch = match self { + ReadSnapshot::FrontendPinned { snapshot, .. } => Epoch(snapshot.committed_epoch()), + ReadSnapshot::Other(epoch) => *epoch, + }; + InlineNowProcTime::new(epoch) } /// Returns true if this snapshot is a barrier read. @@ -111,12 +101,16 @@ pub type PinnedSnapshotRef = Arc; impl PinnedSnapshot { fn batch_query_epoch(&self, is_barrier_read: bool) -> BatchQueryEpoch { let epoch = if is_barrier_read { - batch_query_epoch::Epoch::Current(self.value.current_epoch) + batch_query_epoch::Epoch::Current(u64::MAX) } else { batch_query_epoch::Epoch::Committed(self.value.committed_epoch) }; BatchQueryEpoch { epoch: Some(epoch) } } + + pub fn committed_epoch(&self) -> u64 { + self.value.committed_epoch + } } impl Drop for PinnedSnapshot { @@ -129,7 +123,6 @@ impl Drop for PinnedSnapshot { fn invalid_snapshot() -> PbHummockSnapshot { PbHummockSnapshot { committed_epoch: INVALID_EPOCH, - current_epoch: INVALID_EPOCH, } } @@ -191,10 +184,6 @@ impl HummockSnapshotManager { old_snapshot.value.committed_epoch, snapshot.committed_epoch, ), - current_epoch: std::cmp::max( - old_snapshot.value.current_epoch, - snapshot.current_epoch, - ), }; if old_snapshot.value == snapshot { @@ -251,7 +240,7 @@ impl Operation { match self { Operation::Pin(s) | Operation::Unpin(s) => s, } - .current_epoch + .committed_epoch == INVALID_EPOCH } } @@ -266,8 +255,7 @@ impl Eq for SnapshotKey {} impl Ord for SnapshotKey { fn cmp(&self, other: &Self) -> std::cmp::Ordering { - (self.0.committed_epoch, self.0.current_epoch) - .cmp(&(other.0.committed_epoch, other.0.current_epoch)) + self.0.committed_epoch.cmp(&other.0.committed_epoch) } } diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 68832d6824718..2a17c3ee0dc2e 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -47,7 +47,7 @@ use crate::handler::HandlerArgs; use crate::optimizer::plan_node::{generic, BatchLogSeqScan}; use crate::optimizer::property::{Order, RequiredDist}; use crate::optimizer::PlanRoot; -use crate::scheduler::{DistributedQueryStream, LocalQueryStream}; +use crate::scheduler::{DistributedQueryStream, LocalQueryStream, ReadSnapshot}; use crate::{OptimizerContext, OptimizerContextRef, PgResponseStream, PlanRef, TableCatalog}; pub enum CursorDataChunkStream { @@ -247,19 +247,14 @@ impl SubscriptionCursor { // TODO: is this the right behavior? Should we delay the query stream initiation till the first fetch? let (chunk_stream, fields) = Self::initiate_query(None, &dependent_table_id, handle_args.clone()).await?; - let pinned_epoch = handle_args - .session - .get_pinned_snapshot() - .ok_or_else(|| { - ErrorCode::InternalError("Fetch Cursor can't find snapshot epoch".to_string()) - })? - .epoch_with_frontend_pinned() - .ok_or_else(|| { - ErrorCode::InternalError( - "Fetch Cursor can't support setting an epoch".to_string(), - ) - })? - .0; + let pinned_epoch = match handle_args.session.get_pinned_snapshot().ok_or_else(|| { + ErrorCode::InternalError("Fetch Cursor can't find snapshot epoch".to_string()) + })? { + ReadSnapshot::FrontendPinned { snapshot, .. } => snapshot.committed_epoch(), + ReadSnapshot::Other(_) => { + return Err(ErrorCode::InternalError("Fetch Cursor can't start from specified query epoch. May run `set query_epoch = 0;`".to_string()).into()); + } + }; let start_timestamp = pinned_epoch; ( diff --git a/src/frontend/src/session/transaction.rs b/src/frontend/src/session/transaction.rs index cdc2b0e7d692c..d14df7f4900a2 100644 --- a/src/frontend/src/session/transaction.rs +++ b/src/frontend/src/session/transaction.rs @@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Weak}; use parking_lot::{MappedMutexGuard, Mutex, MutexGuard}; +use risingwave_common::session_config::VisibilityMode; use risingwave_hummock_sdk::EpochWithGap; use super::SessionImpl; @@ -137,6 +138,11 @@ impl SessionImpl { // explicit transaction. State::Initial => unreachable!("no implicit transaction in progress"), State::Implicit(ctx) => { + if self.config().visibility_mode() == VisibilityMode::All { + self.notice_to_user( + "`visibility_mode` is set to `All`, and there is no consistency ensured in the transaction", + ); + } *txn = State::Explicit(Context { id: ctx.id, access_mode, diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 086d4ff7de251..ee6ff589e0cdb 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -938,24 +938,15 @@ impl FrontendMetaClient for MockFrontendMetaClient { async fn try_unregister(&self) {} async fn pin_snapshot(&self) -> RpcResult { - Ok(HummockSnapshot { - committed_epoch: 0, - current_epoch: 0, - }) + Ok(HummockSnapshot { committed_epoch: 0 }) } async fn get_snapshot(&self) -> RpcResult { - Ok(HummockSnapshot { - committed_epoch: 0, - current_epoch: 0, - }) + Ok(HummockSnapshot { committed_epoch: 0 }) } async fn flush(&self, _checkpoint: bool) -> RpcResult { - Ok(HummockSnapshot { - committed_epoch: 0, - current_epoch: 0, - }) + Ok(HummockSnapshot { committed_epoch: 0 }) } async fn wait(&self) -> RpcResult<()> { diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 04b8fed711dd1..0bea5f37940d6 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -844,6 +844,42 @@ impl CommandContext { mutation } + pub fn actors_to_create(&self) -> Option>> { + match &self.command { + Command::CreateStreamingJob { info, job_type } => { + let mut map = match job_type { + CreateStreamingJobType::Normal => HashMap::new(), + CreateStreamingJobType::SinkIntoTable(replace_table) => { + replace_table.new_table_fragments.actors_to_create() + } + CreateStreamingJobType::SnapshotBackfill(_) => { + // for snapshot backfill, the actors to create is measured separately + return None; + } + }; + for (worker_id, new_actors) in info.table_fragments.actors_to_create() { + map.entry(worker_id).or_default().extend(new_actors) + } + Some(map) + } + Command::RescheduleFragment { reschedules, .. } => { + let mut map: HashMap> = HashMap::new(); + for (actor, status) in reschedules + .values() + .flat_map(|reschedule| reschedule.newly_created_actors.iter()) + { + let worker_id = status.location.as_ref().unwrap().worker_node_id; + map.entry(worker_id).or_default().push(actor.clone()); + } + Some(map) + } + Command::ReplaceTable(replace_table) => { + Some(replace_table.new_table_fragments.actors_to_create()) + } + _ => None, + } + } + fn generate_update_mutation_for_replace_table( old_table_fragments: &TableFragments, merge_updates: &[MergeUpdate], diff --git a/src/meta/src/barrier/creating_job/mod.rs b/src/meta/src/barrier/creating_job/mod.rs index 0d478fcf66142..c5a52437e2b7d 100644 --- a/src/meta/src/barrier/creating_job/mod.rs +++ b/src/meta/src/barrier/creating_job/mod.rs @@ -21,20 +21,23 @@ use std::mem::take; use std::sync::Arc; use std::time::Duration; +use itertools::Itertools; use prometheus::HistogramTimer; use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge}; use risingwave_common::util::epoch::Epoch; use risingwave_pb::common::WorkerNode; use risingwave_pb::ddl_service::DdlProgress; use risingwave_pb::hummock::HummockVersionStats; -use risingwave_pb::stream_service::BarrierCompleteResponse; +use risingwave_pb::stream_service::{BarrierCompleteResponse, BuildActorInfo}; use tracing::{debug, info}; use crate::barrier::command::CommandContext; use crate::barrier::creating_job::barrier_control::{ CreatingStreamingJobBarrierControl, CreatingStreamingJobBarrierType, }; -use crate::barrier::creating_job::status::CreatingStreamingJobStatus; +use crate::barrier::creating_job::status::{ + CreatingJobInjectBarrierInfo, CreatingStreamingJobStatus, +}; use crate::barrier::info::InflightGraphInfo; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; @@ -89,6 +92,8 @@ impl CreatingStreamingJobControl { let table_id = info.table_fragments.table_id(); let table_id_str = format!("{}", table_id.table_id); + let actors_to_create = info.table_fragments.actors_to_create(); + Self { info, snapshot_backfill_info, @@ -103,6 +108,23 @@ impl CreatingStreamingJobControl { backfill_epoch, pending_non_checkpoint_barriers: vec![], snapshot_backfill_actors, + actors_to_create: Some( + actors_to_create + .into_iter() + .map(|(worker_id, actors)| { + ( + worker_id, + actors + .into_iter() + .map(|actor| BuildActorInfo { + actor: Some(actor), + related_subscriptions: Default::default(), + }) + .collect_vec(), + ) + }) + .collect(), + ), }, upstream_lag: metrics .snapshot_backfill_lag @@ -256,7 +278,13 @@ impl CreatingStreamingJobControl { .active_graph_info() .expect("must exist when having barriers to inject"); let table_id = self.info.table_fragments.table_id(); - for (curr_epoch, prev_epoch, kind) in barriers_to_inject { + for CreatingJobInjectBarrierInfo { + curr_epoch, + prev_epoch, + kind, + new_actors, + } in barriers_to_inject + { let node_to_collect = control_stream_manager.inject_barrier( Some(table_id), None, @@ -265,6 +293,7 @@ impl CreatingStreamingJobControl { graph_info, Some(graph_info), HashMap::new(), + new_actors, )?; self.barrier_control.enqueue_epoch( prev_epoch.value().0, @@ -329,6 +358,7 @@ impl CreatingStreamingJobControl { graph_info, Some(graph_info), HashMap::new(), + None, )?; self.barrier_control.enqueue_epoch( command_ctx.prev_epoch.value().0, @@ -375,6 +405,7 @@ impl CreatingStreamingJobControl { Some(graph_info) }, HashMap::new(), + None, )?; let prev_epoch = command_ctx.prev_epoch.value().0; self.barrier_control.enqueue_epoch( diff --git a/src/meta/src/barrier/creating_job/status.rs b/src/meta/src/barrier/creating_job/status.rs index f2d0022298345..0569752b1056b 100644 --- a/src/meta/src/barrier/creating_job/status.rs +++ b/src/meta/src/barrier/creating_job/status.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use risingwave_common::util::epoch::Epoch; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress; +use risingwave_pb::stream_service::BuildActorInfo; use crate::barrier::command::CommandContext; use crate::barrier::info::InflightGraphInfo; @@ -39,6 +40,7 @@ pub(super) enum CreatingStreamingJobStatus { /// The `prev_epoch` of pending non checkpoint barriers pending_non_checkpoint_barriers: Vec, snapshot_backfill_actors: HashMap>, + actors_to_create: Option>>, }, ConsumingLogStore { graph_info: InflightGraphInfo, @@ -53,6 +55,13 @@ pub(super) enum CreatingStreamingJobStatus { }, } +pub(super) struct CreatingJobInjectBarrierInfo { + pub curr_epoch: TracedEpoch, + pub prev_epoch: TracedEpoch, + pub kind: BarrierKind, + pub new_actors: Option>>, +} + impl CreatingStreamingJobStatus { pub(super) fn active_graph_info(&self) -> Option<&InflightGraphInfo> { match self { @@ -84,14 +93,10 @@ impl CreatingStreamingJobStatus { /// return /// - Some(vec[(`curr_epoch`, `prev_epoch`, `barrier_kind`)]) of barriers to newly inject /// - Some(`graph_info`) when the status should transit to `ConsumingLogStore` - #[expect(clippy::type_complexity)] pub(super) fn may_inject_fake_barrier( &mut self, is_checkpoint: bool, - ) -> Option<( - Vec<(TracedEpoch, TracedEpoch, BarrierKind)>, - Option, - )> { + ) -> Option<(Vec, Option)> { if let CreatingStreamingJobStatus::ConsumingSnapshot { prev_epoch_fake_physical_time, pending_commands, @@ -99,27 +104,32 @@ impl CreatingStreamingJobStatus { graph_info, pending_non_checkpoint_barriers, ref backfill_epoch, + actors_to_create, .. } = self { if create_mview_tracker.has_pending_finished_jobs() { + assert!(actors_to_create.is_none()); pending_non_checkpoint_barriers.push(*backfill_epoch); let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time); - let barriers_to_inject = [( - TracedEpoch::new(Epoch(*backfill_epoch)), - TracedEpoch::new(prev_epoch), - BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)), - )] - .into_iter() - .chain(pending_commands.drain(..).map(|command_ctx| { - ( - command_ctx.curr_epoch.clone(), - command_ctx.prev_epoch.clone(), - command_ctx.kind.clone(), - ) - })) - .collect(); + let barriers_to_inject = + [CreatingJobInjectBarrierInfo { + curr_epoch: TracedEpoch::new(Epoch(*backfill_epoch)), + prev_epoch: TracedEpoch::new(prev_epoch), + kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)), + new_actors: None, + }] + .into_iter() + .chain(pending_commands.drain(..).map(|command_ctx| { + CreatingJobInjectBarrierInfo { + curr_epoch: command_ctx.curr_epoch.clone(), + prev_epoch: command_ctx.prev_epoch.clone(), + kind: command_ctx.kind.clone(), + new_actors: None, + } + })) + .collect(); let graph_info = take(graph_info); Some((barriers_to_inject, Some(graph_info))) @@ -135,7 +145,15 @@ impl CreatingStreamingJobStatus { } else { BarrierKind::Barrier }; - Some((vec![(curr_epoch, prev_epoch, kind)], None)) + Some(( + vec![CreatingJobInjectBarrierInfo { + curr_epoch, + prev_epoch, + kind, + new_actors: actors_to_create.take(), + }], + None, + )) } } else { None diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 4d62f8da5122d..5fc9dc5112a65 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -720,10 +720,6 @@ impl GlobalBarrierManager { { let latest_snapshot = self.context.hummock_manager.latest_snapshot(); - assert_eq!( - latest_snapshot.committed_epoch, latest_snapshot.current_epoch, - "persisted snapshot must be from a checkpoint barrier" - ); let prev_epoch = TracedEpoch::new(latest_snapshot.committed_epoch.into()); // Bootstrap recovery. Here we simply trigger a recovery process to achieve the @@ -1274,7 +1270,6 @@ impl GlobalBarrierManagerContext { ) -> MetaResult> { { { - let prev_epoch = command_ctx.prev_epoch.value().0; // We must ensure all epochs are committed in ascending order, // because the storage engine will query from new to old in the order in which // the L0 layer files are generated. @@ -1295,7 +1290,6 @@ impl GlobalBarrierManagerContext { new_snapshot = self.hummock_manager.commit_epoch(commit_info).await?; } BarrierKind::Barrier => { - new_snapshot = Some(self.hummock_manager.update_current_epoch(prev_epoch)); // if we collect a barrier(checkpoint = false), // we need to ensure that command is Plain and the notifier's checkpoint is // false diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 41884b28a64c1..25fe1fd2ceff7 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{BTreeSet, HashMap, HashSet}; -use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; @@ -23,25 +22,24 @@ use risingwave_common::config::DefaultParallelism; use risingwave_common::hash::WorkerSlotId; use risingwave_common::util::epoch::Epoch; use risingwave_meta_model_v2::StreamingParallelism; -use risingwave_pb::common::ActorInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::State; use risingwave_pb::meta::{PausedReason, Recovery}; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::AddMutation; +use risingwave_pb::stream_service::BuildActorInfo; use thiserror_ext::AsReport; use tokio::time::Instant; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tracing::{debug, info, warn, Instrument}; use super::{CheckpointControl, TracedEpoch}; -use crate::barrier::command::CommandContext; use crate::barrier::info::{InflightGraphInfo, InflightSubscriptionInfo}; use crate::barrier::progress::CreateMviewProgressTracker; use crate::barrier::rpc::ControlStreamManager; use crate::barrier::schedule::ScheduledBarriers; use crate::barrier::state::BarrierManagerState; -use crate::barrier::{BarrierKind, Command, GlobalBarrierManager, GlobalBarrierManagerContext}; +use crate::barrier::{BarrierKind, GlobalBarrierManager, GlobalBarrierManagerContext}; use crate::controller::catalog::ReleaseContext; use crate::manager::{ActiveStreamingWorkerNodes, MetadataManager, WorkerId}; use crate::model::{MetadataModel, MigrationPlan, TableFragments, TableParallelism}; @@ -369,52 +367,38 @@ impl GlobalBarrierManager { }; // update and build all actors. - self.context - .update_actors(&info, &subscription_info, &active_streaming_nodes) + let node_actors = self + .context + .load_all_actors(&info, &subscription_info, &active_streaming_nodes) .await .inspect_err(|err| { warn!(error = %err.as_report(), "update actors failed"); })?; - self.context - .build_actors(&info, &active_streaming_nodes) - .await - .inspect_err(|err| { - warn!(error = %err.as_report(), "build_actors failed"); - })?; // get split assignments for all actors let source_split_assignments = self.context.source_manager.list_assignments().await; - let command = Command::Plain(Some(Mutation::Add(AddMutation { + let mutation = Mutation::Add(AddMutation { // Actors built during recovery is not treated as newly added actors. actor_dispatchers: Default::default(), added_actors: Default::default(), actor_splits: build_actor_connector_splits(&source_split_assignments), pause: paused_reason.is_some(), subscriptions_to_add: Default::default(), - }))); + }); // Use a different `curr_epoch` for each recovery attempt. let new_epoch = prev_epoch.next(); - // Inject the `Initial` barrier to initialize all executors. - let command_ctx = Arc::new(CommandContext::new( - active_streaming_nodes.current().clone(), - subscription_info.clone(), - prev_epoch.clone(), - new_epoch.clone(), - paused_reason, - command, - BarrierKind::Initial, - self.context.clone(), - tracing::Span::current(), // recovery span - )); - - let mut node_to_collect = control_stream_manager.inject_command_ctx_barrier( - &command_ctx, + let mut node_to_collect = control_stream_manager.inject_barrier( + None, + Some(mutation), + (&new_epoch, &prev_epoch), + &BarrierKind::Initial, &info, Some(&info), HashMap::new(), + Some(node_actors), )?; debug!(?node_to_collect, "inject initial barrier"); while !node_to_collect.is_empty() { @@ -422,18 +406,13 @@ impl GlobalBarrierManager { .next_complete_barrier_response() .await; let resp = result?; - assert_eq!(resp.epoch, command_ctx.prev_epoch.value().0); + assert_eq!(resp.epoch, prev_epoch.value().0); assert!(node_to_collect.remove(&worker_id)); } debug!("collected initial barrier"); ( - BarrierManagerState::new( - new_epoch, - info, - subscription_info, - command_ctx.next_paused_reason(), - ), + BarrierManagerState::new(new_epoch, info, subscription_info, paused_reason), active_streaming_nodes, control_stream_manager, tracker, @@ -1114,36 +1093,18 @@ impl GlobalBarrierManagerContext { } /// Update all actors in compute nodes. - async fn update_actors( + async fn load_all_actors( &self, info: &InflightGraphInfo, subscription_info: &InflightSubscriptionInfo, active_nodes: &ActiveStreamingWorkerNodes, - ) -> MetaResult<()> { + ) -> MetaResult>> { if info.actor_map.is_empty() { tracing::debug!("no actor to update, skipping."); - return Ok(()); + return Ok(HashMap::new()); } - let actor_infos: Vec<_> = info - .actor_map - .iter() - .map(|(node_id, actors)| { - let host = active_nodes - .current() - .get(node_id) - .ok_or_else(|| anyhow::anyhow!("worker evicted, wait for online."))? - .host - .clone(); - Ok(actors.iter().map(move |&actor_id| ActorInfo { - actor_id, - host: host.clone(), - })) as MetaResult<_> - }) - .flatten_ok() - .try_collect()?; - - let mut all_node_actors = self + let all_node_actors = self .metadata_manager .all_node_actors(false, &subscription_info.mv_depended_subscriptions) .await?; @@ -1160,45 +1121,7 @@ impl GlobalBarrierManagerContext { return Err(anyhow!("actors dropped during update").into()); } - self.stream_rpc_manager - .broadcast_update_actor_info( - active_nodes.current(), - info.actor_map.keys().cloned(), - actor_infos.into_iter(), - info.actor_map.keys().map(|node_id| { - ( - *node_id, - all_node_actors.remove(node_id).unwrap_or_default(), - ) - }), - ) - .await?; - - Ok(()) - } - - /// Build all actors in compute nodes. - async fn build_actors( - &self, - info: &InflightGraphInfo, - active_nodes: &ActiveStreamingWorkerNodes, - ) -> MetaResult<()> { - if info.actor_map.is_empty() { - tracing::debug!("no actor to build, skipping."); - return Ok(()); - } - - self.stream_rpc_manager - .build_actors( - active_nodes.current(), - info.actor_map.iter().map(|(node_id, actors)| { - let actors = actors.iter().cloned().collect(); - (*node_id, actors) - }), - ) - .await?; - - Ok(()) + Ok(all_node_actors) } } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 1b4ab6207db9a..7ad468b04aa4c 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -30,12 +30,12 @@ use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::common::{ActorInfo, WorkerNode}; use risingwave_pb::stream_plan::barrier_mutation::Mutation; use risingwave_pb::stream_plan::{Barrier, BarrierMutation}; +use risingwave_pb::stream_service::build_actor_info::SubscriptionIds; use risingwave_pb::stream_service::streaming_control_stream_request::RemovePartialGraphRequest; use risingwave_pb::stream_service::{ streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse, - BroadcastActorInfoTableRequest, BuildActorInfo, BuildActorsRequest, DropActorsRequest, - InjectBarrierRequest, StreamingControlStreamRequest, StreamingControlStreamResponse, - UpdateActorsRequest, + BuildActorInfo, DropActorsRequest, InjectBarrierRequest, StreamingControlStreamRequest, + StreamingControlStreamResponse, }; use risingwave_rpc_client::error::RpcError; use risingwave_rpc_client::StreamClient; @@ -263,6 +263,39 @@ impl ControlStreamManager { pre_applied_graph_info, applied_graph_info, actor_ids_to_pre_sync_mutation, + command_ctx.actors_to_create().map(|actors_to_create| { + actors_to_create + .into_iter() + .map(|(worker_id, actors)| { + ( + worker_id, + actors + .into_iter() + .map(|actor| BuildActorInfo { + actor: Some(actor), + // TODO: consider subscriber of backfilling mv + related_subscriptions: command_ctx + .subscription_info + .mv_depended_subscriptions + .iter() + .map(|(table_id, subscriptions)| { + ( + table_id.table_id, + SubscriptionIds { + subscription_ids: subscriptions + .keys() + .cloned() + .collect(), + }, + ) + }) + .collect(), + }) + .collect_vec(), + ) + }) + .collect() + }), ) } @@ -275,6 +308,7 @@ impl ControlStreamManager { pre_applied_graph_info: &InflightGraphInfo, applied_graph_info: Option<&InflightGraphInfo>, actor_ids_to_pre_sync_mutation: HashMap>, + mut new_actors: Option>>, ) -> MetaResult> { fail_point!("inject_barrier_err", |_| risingwave_common::bail!( "inject_barrier_err" @@ -287,17 +321,41 @@ impl ControlStreamManager { }) .unwrap_or(u32::MAX); - for worker_id in pre_applied_graph_info.worker_ids().chain( - applied_graph_info - .into_iter() - .flat_map(|info| info.worker_ids()), - ) { + for worker_id in pre_applied_graph_info + .worker_ids() + .chain( + applied_graph_info + .into_iter() + .flat_map(|info| info.worker_ids()), + ) + .chain( + new_actors + .iter() + .flat_map(|new_actors| new_actors.keys().cloned()), + ) + { if !self.nodes.contains_key(&worker_id) { return Err(anyhow!("unconnected worker node {}", worker_id).into()); } } let mut node_need_collect = HashSet::new(); + let new_actors_location_to_broadcast = new_actors + .iter() + .flatten() + .flat_map(|(worker_id, actor_infos)| { + actor_infos.iter().map(|actor_info| ActorInfo { + actor_id: actor_info.actor.as_ref().unwrap().actor_id, + host: self + .nodes + .get(worker_id) + .expect("have checked exist previously") + .worker + .host + .clone(), + }) + }) + .collect_vec(); self.nodes .iter_mut() @@ -344,6 +402,14 @@ impl ControlStreamManager { .flatten() .cloned() .collect(), + broadcast_info: new_actors_location_to_broadcast.clone(), + actors_to_build: new_actors + .as_mut() + .map(|new_actors| new_actors.remove(node_id)) + .into_iter() + .flatten() + .flatten() + .collect(), }, ), ), @@ -472,73 +538,6 @@ impl StreamRpcManager { Uuid::new_v4().to_string() } - pub async fn build_actors( - &self, - node_map: &HashMap, - node_actors: impl Iterator)>, - ) -> MetaResult<()> { - self.make_request( - node_actors.map(|(worker_id, actors)| (node_map.get(&worker_id).unwrap(), actors)), - |client, actors| async move { - let request_id = Self::new_request_id(); - tracing::debug!(request_id = request_id.as_str(), actors = ?actors, "build actors"); - client - .build_actors(BuildActorsRequest { - request_id, - actor_id: actors, - }) - .await - }, - ) - .await?; - Ok(()) - } - - /// Broadcast and update actor info in CN. - /// `node_actors_to_create` must be a subset of `broadcast_worker_ids`. - pub async fn broadcast_update_actor_info( - &self, - worker_nodes: &HashMap, - broadcast_worker_ids: impl Iterator, - actor_infos_to_broadcast: impl Iterator, - node_actors_to_create: impl Iterator)>, - ) -> MetaResult<()> { - let actor_infos = actor_infos_to_broadcast.collect_vec(); - let mut node_actors_to_create = node_actors_to_create.collect::>(); - self.make_request( - broadcast_worker_ids - .map(|worker_id| { - let node = worker_nodes.get(&worker_id).unwrap(); - let actors = node_actors_to_create.remove(&worker_id); - (node, actors) - }), - |client, actors| { - let info = actor_infos.clone(); - async move { - client - .broadcast_actor_info_table(BroadcastActorInfoTableRequest { info }) - .await?; - if let Some(actors) = actors { - let request_id = Self::new_request_id(); - let actor_ids = actors.iter().map(|actor| actor.actor.as_ref().unwrap().actor_id).collect_vec(); - tracing::debug!(request_id = request_id.as_str(), actors = ?actor_ids, "update actors"); - client - .update_actors(UpdateActorsRequest { request_id, actors }) - .await?; - } - Ok(()) - } - }, - ) - .await?; - assert!( - node_actors_to_create.is_empty(), - "remaining uncreated actors: {:?}", - node_actors_to_create - ); - Ok(()) - } - pub async fn drop_actors( &self, node_map: &HashMap, diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index 0853e28fcb572..08428e5472e23 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -333,13 +333,9 @@ impl HummockManager { } let snapshot = if is_visible_table_committed_epoch { - let snapshot = HummockSnapshot { - committed_epoch, - current_epoch: committed_epoch, - }; + let snapshot = HummockSnapshot { committed_epoch }; let prev_snapshot = self.latest_snapshot.swap(snapshot.into()); assert!(prev_snapshot.committed_epoch < committed_epoch); - assert!(prev_snapshot.current_epoch < committed_epoch); Some(snapshot) } else { None diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 8c06acbd580f9..ded8d507dbffc 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -276,7 +276,6 @@ impl HummockManager { compactor_manager, latest_snapshot: ArcSwap::from_pointee(HummockSnapshot { committed_epoch: INVALID_EPOCH, - current_epoch: INVALID_EPOCH, }), event_sender: tx, delete_object_tracker: Default::default(), @@ -432,7 +431,6 @@ impl HummockManager { self.latest_snapshot.store( HummockSnapshot { committed_epoch: redo_state.visible_table_committed_epoch(), - current_epoch: redo_state.visible_table_committed_epoch(), } .into(), ); diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 4b9ddcb18cbc2..56b4836f585a1 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -151,7 +151,6 @@ async fn test_unpin_snapshot_before() { context_id, HummockSnapshot { committed_epoch: epoch, - current_epoch: epoch, }, ) .await @@ -170,7 +169,6 @@ async fn test_unpin_snapshot_before() { context_id, HummockSnapshot { committed_epoch: epoch2, - current_epoch: epoch2, }, ) .await diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 7b320847e64fd..3d621a1d59913 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -281,22 +281,6 @@ impl HummockManager { HummockSnapshot::clone(&snapshot) } - /// We don't commit an epoch without checkpoint. We will only update the `max_current_epoch`. - pub fn update_current_epoch(&self, max_current_epoch: HummockEpoch) -> HummockSnapshot { - // We only update `max_current_epoch`! - let prev_snapshot = self.latest_snapshot.rcu(|snapshot| HummockSnapshot { - committed_epoch: snapshot.committed_epoch, - current_epoch: max_current_epoch, - }); - assert!(prev_snapshot.current_epoch < max_current_epoch); - - tracing::trace!("new current epoch {}", max_current_epoch); - HummockSnapshot { - committed_epoch: prev_snapshot.committed_epoch, - current_epoch: max_current_epoch, - } - } - pub async fn list_change_log_epochs( &self, table_id: u32, diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index daa320c625f64..1cdd8547c8247 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -141,7 +141,6 @@ impl HummockMetaClient for MockHummockMetaClient { self.context_id, HummockSnapshot { committed_epoch: pinned_epochs, - current_epoch: pinned_epochs, }, ) .await @@ -201,11 +200,6 @@ impl HummockMetaClient for MockHummockMetaClient { Ok(()) } - async fn update_current_epoch(&self, epoch: HummockEpoch) -> Result<()> { - self.hummock_manager.update_current_epoch(epoch); - Ok(()) - } - async fn report_vacuum_task(&self, _vacuum_task: VacuumTask) -> Result<()> { Ok(()) } diff --git a/src/meta/src/model/stream.rs b/src/meta/src/model/stream.rs index 6fb30ee6aa9fc..bec6b95cfb0f9 100644 --- a/src/meta/src/model/stream.rs +++ b/src/meta/src/model/stream.rs @@ -527,14 +527,19 @@ impl TableFragments { actors } - /// Returns actor map: `actor_id` => `StreamActor`. - pub fn actor_map(&self) -> HashMap { - let mut actor_map = HashMap::default(); - self.fragments.values().for_each(|fragment| { - fragment.actors.iter().for_each(|actor| { - actor_map.insert(actor.actor_id, actor.clone()); + pub fn actors_to_create(&self) -> HashMap> { + let mut actor_map: HashMap<_, Vec<_>> = HashMap::new(); + self.fragments + .values() + .flat_map(|fragment| fragment.actors.iter()) + .for_each(|actor| { + let worker_id = self + .actor_status + .get(&actor.actor_id) + .expect("should exist") + .worker_id(); + actor_map.entry(worker_id).or_default().push(actor.clone()); }); - }); actor_map } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 42ed98b372c7d..6ecb061e509b4 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -31,7 +31,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::hash::{ActorMapping, VirtualNode}; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_meta_model_v2::{actor, fragment, ObjectId, StreamingParallelism}; -use risingwave_pb::common::{ActorInfo, Buffer, PbActorLocation, WorkerNode, WorkerType}; +use risingwave_pb::common::{Buffer, PbActorLocation, WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::fragment::{ @@ -43,8 +43,6 @@ use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, PbDispatcher, PbStreamActor, StreamNode, }; -use risingwave_pb::stream_service::build_actor_info::SubscriptionIds; -use risingwave_pb::stream_service::BuildActorInfo; use thiserror_ext::AsReport; use tokio::sync::oneshot::Receiver; use tokio::sync::{oneshot, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -187,8 +185,6 @@ pub struct RescheduleContext { actor_status: BTreeMap, /// Meta information of all `Fragment`, used to find the `Fragment`'s `Actor` fragment_map: HashMap, - /// Indexes for all `Worker`s - worker_nodes: HashMap, /// Index of all `Actor` upstreams, specific to `Dispatcher` upstream_dispatchers: HashMap>, /// Fragments with stream source @@ -891,7 +887,6 @@ impl ScaleController { actor_map, actor_status, fragment_map, - worker_nodes, upstream_dispatchers, stream_source_fragment_ids, no_shuffle_target_fragment_ids, @@ -900,42 +895,6 @@ impl ScaleController { }) } - pub(crate) async fn create_actors_on_compute_node( - &self, - worker_nodes: &HashMap, - actor_infos_to_broadcast: BTreeMap, - node_actors_to_create: HashMap>, - broadcast_worker_ids: HashSet, - ) -> MetaResult<()> { - self.stream_rpc_manager - .broadcast_update_actor_info( - worker_nodes, - broadcast_worker_ids.into_iter(), - actor_infos_to_broadcast.values().cloned(), - node_actors_to_create.clone().into_iter(), - ) - .await?; - - self.stream_rpc_manager - .build_actors( - worker_nodes, - node_actors_to_create - .iter() - .map(|(node_id, stream_actors)| { - ( - *node_id, - stream_actors - .iter() - .map(|stream_actor| stream_actor.actor.as_ref().unwrap().actor_id) - .collect_vec(), - ) - }), - ) - .await?; - - Ok(()) - } - /// From the high-level [`WorkerReschedule`] to the low-level reschedule plan [`Reschedule`]. /// /// Returns `(reschedule_fragment, applied_reschedules)` @@ -1296,116 +1255,6 @@ impl ScaleController { } } - if !options.skip_create_new_actors { - // After modification, for newly created actors, both upstream and downstream actor ids - // have been modified - let mut actor_infos_to_broadcast = BTreeMap::new(); - let mut node_actors_to_create: HashMap> = HashMap::new(); - let mut broadcast_worker_ids = HashSet::new(); - - let subscriptions: HashMap<_, SubscriptionIds> = self - .metadata_manager - .get_mv_depended_subscriptions() - .await? - .iter() - .map(|(table_id, subscriptions)| { - ( - table_id.table_id, - SubscriptionIds { - subscription_ids: subscriptions.keys().cloned().collect(), - }, - ) - }) - .collect(); - - for actors_to_create in fragment_actors_to_create.values() { - for (new_actor_id, worker_id) in actors_to_create { - let new_actor = new_created_actors.get(new_actor_id).unwrap(); - for upstream_actor_id in &new_actor.upstream_actor_id { - if new_created_actors.contains_key(upstream_actor_id) { - continue; - } - - let upstream_worker_id = ctx.actor_id_to_worker_id(upstream_actor_id)?; - - let upstream_worker = - ctx.worker_nodes.get(&upstream_worker_id).with_context(|| { - format!("upstream worker {} not found", upstream_worker_id) - })?; - - // Force broadcast upstream actor info, because the actor information of the new - // node may not have been synchronized yet - actor_infos_to_broadcast.insert( - *upstream_actor_id, - ActorInfo { - actor_id: *upstream_actor_id, - host: upstream_worker.host.clone(), - }, - ); - - broadcast_worker_ids.insert(upstream_worker_id); - } - - for dispatcher in &new_actor.dispatcher { - for downstream_actor_id in &dispatcher.downstream_actor_id { - if new_created_actors.contains_key(downstream_actor_id) { - continue; - } - let downstream_worker_id = - ctx.actor_id_to_worker_id(downstream_actor_id)?; - - let downstream_worker = ctx - .worker_nodes - .get(&downstream_worker_id) - .with_context(|| { - format!("downstream worker {} not found", downstream_worker_id) - })?; - - actor_infos_to_broadcast.insert( - *downstream_actor_id, - ActorInfo { - actor_id: *downstream_actor_id, - host: downstream_worker.host.clone(), - }, - ); - - broadcast_worker_ids.insert(downstream_worker_id); - } - } - - let worker = ctx.worker_nodes.get(worker_id).unwrap(); - - node_actors_to_create - .entry(worker.id) - .or_default() - .push(BuildActorInfo { - actor: Some(new_actor.clone()), - // TODO: may include only the subscriptions related to the table fragment - // of the actor. - related_subscriptions: subscriptions.clone(), - }); - - broadcast_worker_ids.insert(worker.id); - - actor_infos_to_broadcast.insert( - *new_actor_id, - ActorInfo { - actor_id: *new_actor_id, - host: worker.host.clone(), - }, - ); - } - } - - self.create_actors_on_compute_node( - &ctx.worker_nodes, - actor_infos_to_broadcast, - node_actors_to_create, - broadcast_worker_ids, - ) - .await?; - } - // For stream source fragments, we need to reallocate the splits. // Because we are in the Pause state, so it's no problem to reallocate let mut fragment_stream_source_actor_splits = HashMap::new(); diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 612fc754da24a..a8e8bc47752a5 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -35,7 +35,7 @@ use crate::barrier::{ }; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, NotificationVersion, StreamingJob}; use crate::model::{ActorId, FragmentId, MetadataModel, TableFragments, TableParallelism}; -use crate::stream::{to_build_actor_info, SourceManagerRef}; +use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; pub type GlobalStreamManagerRef = Arc; @@ -291,12 +291,6 @@ impl GlobalStreamManager { // try to cancel buffered creating command. if self.barrier_scheduler.try_cancel_scheduled_create(table_id) { tracing::debug!("cancelling streaming job {table_id} in buffer queue."); - let node_actors = table_fragments.worker_actor_ids(); - let cluster_info = - self.metadata_manager.get_streaming_cluster_info().await?; - self.stream_rpc_manager - .drop_actors(&cluster_info.worker_nodes, node_actors.into_iter()) - .await?; if let MetadataManager::V1(mgr) = &self.metadata_manager { mgr.fragment_manager @@ -334,66 +328,6 @@ impl GlobalStreamManager { bail!("receiver failed to get notification version for finished stream job") } - async fn build_actors( - &self, - table_fragments: &TableFragments, - building_locations: &Locations, - existing_locations: &Locations, - subscription_depend_table_id: TableId, - ) -> MetaResult<()> { - let actor_map = table_fragments.actor_map(); - - // Actors on each stream node will need to know where their upstream lies. `actor_info` - // includes such information. It contains: - // 1. actors in the current create-streaming-job request. - // 2. all upstream actors. - let actor_infos_to_broadcast = building_locations - .actor_infos() - .chain(existing_locations.actor_infos()); - - let building_worker_actors = building_locations.worker_actors(); - let subscriptions = self - .metadata_manager - .get_mv_depended_subscriptions() - .await?; - - // We send RPC request in two stages. - // The first stage does 2 things: broadcast actor info, and send local actor ids to - // different WorkerNodes. Such that each WorkerNode knows the overall actor - // allocation, but not actually builds it. We initialize all channels in this stage. - self.stream_rpc_manager - .broadcast_update_actor_info( - &building_locations.worker_locations, - building_worker_actors.keys().cloned(), - actor_infos_to_broadcast, - building_worker_actors.iter().map(|(worker_id, actors)| { - let stream_actors = actors - .iter() - .map(|actor_id| { - to_build_actor_info( - actor_map[actor_id].clone(), - &subscriptions, - subscription_depend_table_id, - ) - }) - .collect::>(); - (*worker_id, stream_actors) - }), - ) - .await?; - - // In the second stage, each [`WorkerNode`] builds local actors and connect them with - // channels. - self.stream_rpc_manager - .build_actors( - &building_locations.worker_locations, - building_worker_actors.into_iter(), - ) - .await?; - - Ok(()) - } - async fn create_streaming_job_impl( &self, table_fragments: TableFragments, @@ -401,8 +335,6 @@ impl GlobalStreamManager { streaming_job, dispatchers, upstream_root_actors, - building_locations, - existing_locations, definition, create_type, ddl_type, @@ -415,13 +347,6 @@ impl GlobalStreamManager { let mut replace_table_command = None; let mut replace_table_id = None; - self.build_actors( - &table_fragments, - &building_locations, - &existing_locations, - table_fragments.table_id(), - ) - .await?; tracing::debug!( table_id = %table_fragments.table_id(), "built actors finished" @@ -430,14 +355,6 @@ impl GlobalStreamManager { let need_pause = replace_table_job_info.is_some(); if let Some((streaming_job, context, table_fragments)) = replace_table_job_info { - self.build_actors( - &table_fragments, - &context.building_locations, - &context.existing_locations, - context.old_table_fragments.table_id(), - ) - .await?; - match &self.metadata_manager { MetadataManager::V1(mgr) => { // Add table fragments to meta store with state: `State::Initial`. @@ -562,20 +479,11 @@ impl GlobalStreamManager { old_table_fragments, merge_updates, dispatchers, - building_locations, - existing_locations, dummy_id, streaming_job, + .. }: ReplaceTableContext, ) -> MetaResult<()> { - self.build_actors( - &table_fragments, - &building_locations, - &existing_locations, - old_table_fragments.table_id(), - ) - .await?; - let dummy_table_id = table_fragments.table_id(); let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?; @@ -908,51 +816,6 @@ mod tests { type StreamingControlStreamStream = impl Stream>; - async fn update_actors( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - let mut guard = self.inner.actor_streams.lock().unwrap(); - for actor in req.get_actors() { - let actor = actor.actor.as_ref().unwrap(); - guard.insert(actor.get_actor_id(), actor.clone()); - } - - Ok(Response::new(UpdateActorsResponse { status: None })) - } - - async fn build_actors( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - let mut guard = self.inner.actor_ids.lock().unwrap(); - for id in req.get_actor_id() { - guard.insert(*id); - } - - Ok(Response::new(BuildActorsResponse { - request_id: "".to_string(), - status: None, - })) - } - - async fn broadcast_actor_info_table( - &self, - request: Request, - ) -> std::result::Result, Status> { - let req = request.into_inner(); - let mut guard = self.inner.actor_infos.lock().unwrap(); - for info in req.get_info() { - guard.insert(info.get_actor_id(), info.get_host()?.clone()); - } - - Ok(Response::new(BroadcastActorInfoTableResponse { - status: None, - })) - } - async fn drop_actors( &self, _request: Request, @@ -982,6 +845,24 @@ mod tests { })); } streaming_control_stream_request::Request::InjectBarrier(req) => { + { + let mut guard = inner.actor_infos.lock().unwrap(); + for info in req.broadcast_info { + guard.insert( + info.get_actor_id(), + info.get_host().unwrap().clone(), + ); + } + } + { + let mut guard = inner.actor_streams.lock().unwrap(); + let mut actor_ids = inner.actor_ids.lock().unwrap(); + for actor in req.actors_to_build { + let actor = actor.actor.as_ref().unwrap(); + assert!(actor_ids.insert(actor.actor_id)); + guard.insert(actor.get_actor_id(), actor.clone()); + } + } let _ = tx.send(Ok(StreamingControlStreamResponse { response: Some( streaming_control_stream_response::Response::CompleteBarrier( diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index 5c25a59afa7f8..df42a0da3ff35 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -39,7 +39,6 @@ pub trait HummockMetaClient: Send + Sync + 'static { async fn get_new_sst_ids(&self, number: u32) -> Result; // We keep `commit_epoch` only for test/benchmark. async fn commit_epoch(&self, epoch: HummockEpoch, sync_result: SyncResult) -> Result<()>; - async fn update_current_epoch(&self, epoch: HummockEpoch) -> Result<()>; async fn report_vacuum_task(&self, vacuum_task: VacuumTask) -> Result<()>; async fn trigger_manual_compaction( &self, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 5f69a3f779647..b4e06d8690b72 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1489,7 +1489,6 @@ impl HummockMetaClient for MetaClient { // For unpin_snapshot_before, we do not care about snapshots list but only min epoch. min_snapshot: Some(HummockSnapshot { committed_epoch: pinned_epochs, - current_epoch: pinned_epochs, }), }; self.inner.unpin_snapshot_before(req).await?; @@ -1508,10 +1507,6 @@ impl HummockMetaClient for MetaClient { panic!("Only meta service can commit_epoch in production.") } - async fn update_current_epoch(&self, _epoch: HummockEpoch) -> Result<()> { - panic!("Only meta service can update_current_epoch in production.") - } - async fn report_vacuum_task(&self, vacuum_task: VacuumTask) -> Result<()> { let req = ReportVacuumTaskRequest { vacuum_task: Some(vacuum_task), diff --git a/src/rpc_client/src/stream_client.rs b/src/rpc_client/src/stream_client.rs index c3f876549a36d..920b6f0777f37 100644 --- a/src/rpc_client/src/stream_client.rs +++ b/src/rpc_client/src/stream_client.rs @@ -70,10 +70,7 @@ pub type StreamClientPoolRef = Arc; macro_rules! for_all_stream_rpc { ($macro:ident) => { $macro! { - { 0, update_actors, UpdateActorsRequest, UpdateActorsResponse } - ,{ 0, build_actors, BuildActorsRequest, BuildActorsResponse } - ,{ 0, broadcast_actor_info_table, BroadcastActorInfoTableRequest, BroadcastActorInfoTableResponse } - ,{ 0, drop_actors, DropActorsRequest, DropActorsResponse } + { 0, drop_actors, DropActorsRequest, DropActorsResponse } ,{ 0, wait_epoch_commit, WaitEpochCommitRequest, WaitEpochCommitResponse } } }; diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index ea4db8c09b63d..9e6962ab117aa 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -34,6 +34,7 @@ use risingwave_common::util::epoch::EPOCH_SPILL_TIME_MASK; use risingwave_pb::common::{batch_query_epoch, BatchQueryEpoch}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use sstable_info::SstableInfo; +use tracing::warn; use crate::key_range::KeyRangeCommon; use crate::table_stats::TableStatsMap; @@ -206,8 +207,6 @@ impl PartialEq for LocalSstableInfo { pub enum HummockReadEpoch { /// We need to wait the `max_committed_epoch` Committed(HummockEpoch), - /// We need to wait the `max_current_epoch` - Current(HummockEpoch), /// We don't need to wait epoch, we usually do stream reading with it. NoWait(HummockEpoch), /// We don't need to wait epoch. @@ -219,7 +218,15 @@ impl From for HummockReadEpoch { fn from(e: BatchQueryEpoch) -> Self { match e.epoch.unwrap() { batch_query_epoch::Epoch::Committed(epoch) => HummockReadEpoch::Committed(epoch), - batch_query_epoch::Epoch::Current(epoch) => HummockReadEpoch::Current(epoch), + batch_query_epoch::Epoch::Current(epoch) => { + if epoch != HummockEpoch::MAX { + warn!( + epoch, + "ignore specified current epoch and set it to u64::MAX" + ); + } + HummockReadEpoch::NoWait(HummockEpoch::MAX) + } batch_query_epoch::Epoch::Backup(epoch) => HummockReadEpoch::Backup(epoch), batch_query_epoch::Epoch::TimeTravel(epoch) => HummockReadEpoch::TimeTravel(epoch), } @@ -236,7 +243,6 @@ impl HummockReadEpoch { pub fn get_epoch(&self) -> HummockEpoch { *match self { HummockReadEpoch::Committed(epoch) => epoch, - HummockReadEpoch::Current(epoch) => epoch, HummockReadEpoch::NoWait(epoch) => epoch, HummockReadEpoch::Backup(epoch) => epoch, HummockReadEpoch::TimeTravel(epoch) => epoch, diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index 0634ae1f30d10..282409f394476 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -147,10 +147,6 @@ impl ReplayStateStore for GlobalReplayImpl { Ok(result.sync_size) } - fn seal_epoch(&self, epoch_id: u64, is_checkpoint: bool) { - self.store.seal_epoch(epoch_id, is_checkpoint); - } - async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result { let prev_version_id = match &info { Info::HummockVersionDeltas(deltas) => deltas.version_deltas.last().map(|d| d.prev_id), @@ -181,13 +177,6 @@ impl ReplayStateStore for GlobalReplayImpl { .map_err(|_| TraceError::TryWaitEpochFailed)?; Ok(()) } - - fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()> { - self.store - .validate_read_epoch(epoch) - .map_err(|_| TraceError::ValidateReadEpochFailed)?; - Ok(()) - } } pub(crate) struct LocalReplayImpl(LocalHummockStorage); diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index c53ab48b567da..7f3d35f16b80b 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -1053,7 +1053,6 @@ async fn test_multiple_epoch_sync() { .storage .start_epoch(epoch4, HashSet::from_iter([TEST_TABLE_ID])); hummock_storage.seal_current_epoch(epoch4, SealCurrentEpochOptions::for_test()); - test_env.storage.seal_epoch(epoch1, false); let sync_result2 = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); let sync_result3 = test_env.storage.seal_and_sync_epoch(epoch3).await.unwrap(); test_get().await; diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index 1df58074e22d0..35f3d08a9ed8a 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1218,7 +1218,6 @@ async fn test_multiple_epoch_sync_v2() { } }; test_get().await; - hummock_storage.seal_epoch(epoch1, false); let sync_result2 = hummock_storage.seal_and_sync_epoch(epoch2).await.unwrap(); let sync_result3 = hummock_storage.seal_and_sync_epoch(epoch3).await.unwrap(); test_get().await; diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index b9e800a8bbea8..979bf067db861 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -216,13 +216,6 @@ impl TraceSpan { Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type) } - pub fn new_validate_read_epoch_span(epoch: HummockReadEpoch) -> MayTraceSpan { - Self::new_global_op( - Operation::ValidateReadEpoch(epoch.into()), - StorageType::Global, - ) - } - pub fn new_try_wait_epoch_span(epoch: HummockReadEpoch) -> MayTraceSpan { Self::new_global_op(Operation::TryWaitEpoch(epoch.into()), StorageType::Global) } @@ -295,14 +288,6 @@ impl TraceSpan { ) } - pub fn new_seal_span( - epoch: u64, - is_checkpoint: bool, - storage_type: StorageType, - ) -> MayTraceSpan { - Self::new_global_op(Operation::Seal(epoch, is_checkpoint), storage_type) - } - pub fn new_local_storage_span( option: TracedNewLocalOptions, storage_type: StorageType, diff --git a/src/storage/hummock_trace/src/error.rs b/src/storage/hummock_trace/src/error.rs index e80a18aacc869..143918204e407 100644 --- a/src/storage/hummock_trace/src/error.rs +++ b/src/storage/hummock_trace/src/error.rs @@ -51,9 +51,6 @@ pub enum TraceError { #[error("{0}")] Other(&'static str), - #[error("failed to validate epoch")] - ValidateReadEpochFailed, - #[error("failed to try wait epoch")] TryWaitEpochFailed, diff --git a/src/storage/hummock_trace/src/opts.rs b/src/storage/hummock_trace/src/opts.rs index 2a5b62f046d32..5d480cca96b58 100644 --- a/src/storage/hummock_trace/src/opts.rs +++ b/src/storage/hummock_trace/src/opts.rs @@ -194,7 +194,6 @@ pub type TracedHummockEpoch = u64; #[derive(Debug, Clone, PartialEq, Eq, Decode, Encode)] pub enum TracedHummockReadEpoch { Committed(TracedHummockEpoch), - Current(TracedHummockEpoch), NoWait(TracedHummockEpoch), Backup(TracedHummockEpoch), TimeTravel(TracedHummockEpoch), @@ -204,7 +203,6 @@ impl From for TracedHummockReadEpoch { fn from(value: HummockReadEpoch) -> Self { match value { HummockReadEpoch::Committed(epoch) => Self::Committed(epoch), - HummockReadEpoch::Current(epoch) => Self::Current(epoch), HummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch), HummockReadEpoch::Backup(epoch) => Self::Backup(epoch), HummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch), @@ -216,7 +214,6 @@ impl From for HummockReadEpoch { fn from(value: TracedHummockReadEpoch) -> Self { match value { TracedHummockReadEpoch::Committed(epoch) => Self::Committed(epoch), - TracedHummockReadEpoch::Current(epoch) => Self::Current(epoch), TracedHummockReadEpoch::NoWait(epoch) => Self::NoWait(epoch), TracedHummockReadEpoch::Backup(epoch) => Self::Backup(epoch), TracedHummockReadEpoch::TimeTravel(epoch) => Self::TimeTravel(epoch), diff --git a/src/storage/hummock_trace/src/record.rs b/src/storage/hummock_trace/src/record.rs index fc2b0bb1c5c22..2995b7f13c8a7 100644 --- a/src/storage/hummock_trace/src/record.rs +++ b/src/storage/hummock_trace/src/record.rs @@ -148,9 +148,6 @@ pub enum Operation { /// Sync operation of Hummock. Sync(u64, Vec), - /// Seal operation of Hummock. - Seal(u64, bool), - /// `MetaMessage` operation of Hummock. MetaMessage(Box), @@ -175,9 +172,6 @@ pub enum Operation { opts: TracedSealCurrentEpochOptions, }, - /// validate read epoch - ValidateReadEpoch(TracedHummockReadEpoch), - LocalStorageEpoch, LocalStorageIsDirty, @@ -296,7 +290,6 @@ pub enum OperationResult { Sync(TraceResult), NotifyHummock(TraceResult<()>), TryWaitEpoch(TraceResult<()>), - ValidateReadEpoch(TraceResult<()>), LocalStorageEpoch(TraceResult), LocalStorageIsDirty(TraceResult), } diff --git a/src/storage/hummock_trace/src/replay/mod.rs b/src/storage/hummock_trace/src/replay/mod.rs index 9996d6212a01f..91e80cad1c4b6 100644 --- a/src/storage/hummock_trace/src/replay/mod.rs +++ b/src/storage/hummock_trace/src/replay/mod.rs @@ -116,11 +116,9 @@ pub trait ReplayWrite { #[async_trait::async_trait] pub trait ReplayStateStore { async fn sync(&self, id: u64, table_ids: Vec) -> Result; - fn seal_epoch(&self, epoch_id: u64, is_checkpoint: bool); async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; - fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; } // define mock trait for replay interfaces @@ -146,12 +144,10 @@ mock! { #[async_trait::async_trait] impl ReplayStateStore for GlobalReplayInterface{ async fn sync(&self, id: u64, table_ids: Vec) -> Result; - fn seal_epoch(&self, epoch_id: u64, is_checkpoint: bool); async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64, ) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; - fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; } impl GlobalReplay for GlobalReplayInterface{} } diff --git a/src/storage/hummock_trace/src/replay/runner.rs b/src/storage/hummock_trace/src/replay/runner.rs index cb9ddacda1763..3794671ace2a5 100644 --- a/src/storage/hummock_trace/src/replay/runner.rs +++ b/src/storage/hummock_trace/src/replay/runner.rs @@ -90,9 +90,7 @@ mod tests { async fn test_replay() { let mut mock_reader = MockTraceReader::new(); let get_result = TracedBytes::from(vec![54, 32, 198, 236, 24]); - let seal_checkpoint = true; let sync_id = 4561245432; - let seal_id = 5734875243; let opts1 = TracedNewLocalOptions::for_test(1); let opts2 = TracedNewLocalOptions::for_test(2); @@ -197,7 +195,6 @@ mod tests { .map(|(record_id, op)| Ok(Record::new(storage_type3, record_id, op))); let mut non_local: Vec> = vec![ - (12, Operation::Seal(seal_id, seal_checkpoint)), (12, Operation::Finish), (13, Operation::Sync(sync_id, vec![1, 2, 3])), ( @@ -251,12 +248,6 @@ mod tests { .times(1) .returning(|_, _| Ok(0)); - mock_replay - .expect_seal_epoch() - .with(predicate::eq(seal_id), predicate::eq(seal_checkpoint)) - .times(1) - .return_const(()); - let mut replay = HummockReplay::new(mock_reader, mock_replay); replay.run().await.unwrap(); diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index d566dbbe18410..8a02c3efde196 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -267,10 +267,6 @@ impl ReplayWorker { panic!("expect sync result, but got {:?}", res); } } - Operation::Seal(epoch_id, is_checkpoint) => { - assert_eq!(storage_type, StorageType::Global); - replay.seal_epoch(epoch_id, is_checkpoint); - } Operation::IterNext(id) => { let iter = iters_map.get_mut(&id).expect("iter not in worker"); let actual = iter.next().await; @@ -332,23 +328,6 @@ impl ReplayWorker { let local_storage = local_storages.get_mut(&storage_type).unwrap(); local_storage.seal_current_epoch(epoch, opts); } - Operation::ValidateReadEpoch(epoch) => { - assert_eq!(storage_type, StorageType::Global); - let res = res_rx.recv().await.expect("recv result failed"); - let actual = replay.validate_read_epoch(epoch.into()); - if let OperationResult::ValidateReadEpoch(expected) = res { - assert_eq!( - TraceResult::from(actual), - expected, - "validate_read_epoch wrong" - ); - } else { - panic!( - "wrong validate_read_epoch result, expect epoch result, but got {:?}", - res - ); - } - } Operation::LocalStorageEpoch => { assert_ne!(storage_type, StorageType::Global); let local_storage = local_storages.get_mut(&storage_type).unwrap(); diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index 9663a7787c474..4445a74884d5a 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -121,10 +121,6 @@ impl HummockMetaClient for MonitoredHummockMetaClient { .await } - async fn update_current_epoch(&self, epoch: HummockEpoch) -> Result<()> { - self.meta_client.update_current_epoch(epoch).await - } - async fn subscribe_compaction_event( &self, ) -> Result<( diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index 6753131c402f5..b64752fca7fd6 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -15,14 +15,12 @@ use std::collections::HashSet; use std::future::Future; use std::ops::{Bound, Deref}; -use std::sync::atomic::{AtomicU64, Ordering as MemOrdering}; use std::sync::Arc; use arc_swap::ArcSwap; use bytes::Bytes; use futures::FutureExt; use itertools::Itertools; -use more_asserts::assert_gt; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::is_max_epoch; use risingwave_common_service::{NotificationClient, ObserverManager}; @@ -61,7 +59,7 @@ use crate::hummock::{ MemoryLimiter, SstableObjectIdManager, SstableObjectIdManagerRef, SstableStoreRef, }; use crate::mem_table::ImmutableMemtable; -use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics, StoreLocalStatistic}; +use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics}; use crate::opts::StorageOpts; use crate::store::*; @@ -99,8 +97,6 @@ pub struct HummockStorage { version_update_notifier_tx: Arc>, - seal_epoch: Arc, - pinned_version: Arc>, hummock_version_reader: HummockVersionReader, @@ -111,9 +107,6 @@ pub struct HummockStorage { backup_reader: BackupReaderRef, - /// `current_epoch` < `min_current_epoch` cannot be read. - min_current_epoch: Arc, - write_limiter: WriteLimiterRef, compact_await_tree_reg: Option, @@ -211,12 +204,6 @@ impl HummockStorage { await_tree_reg.clone(), ); - let seal_epoch = Arc::new(AtomicU64::new( - pinned_version.visible_table_committed_epoch(), - )); - let min_current_epoch = Arc::new(AtomicU64::new( - pinned_version.visible_table_committed_epoch(), - )); let hummock_event_handler = HummockEventHandler::new( version_update_rx, pinned_version, @@ -234,7 +221,6 @@ impl HummockStorage { sstable_object_id_manager, buffer_tracker: hummock_event_handler.buffer_tracker().clone(), version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(), - seal_epoch, hummock_event_sender: event_tx.clone(), _version_update_sender: version_update_tx, pinned_version: hummock_event_handler.pinned_version(), @@ -248,7 +234,6 @@ impl HummockStorage { }), read_version_mapping: hummock_event_handler.read_version_mapping(), backup_reader, - min_current_epoch, write_limiter, compact_await_tree_reg: await_tree_reg, hummock_meta_client, @@ -525,11 +510,6 @@ impl HummockStorage { .send(HummockEvent::Clear(tx, version_id)) .expect("should send success"); rx.await.expect("should wait success"); - - let epoch = self.pinned_version.load().visible_table_committed_epoch(); - self.min_current_epoch - .store(HummockEpoch::MAX, MemOrdering::SeqCst); - self.seal_epoch.store(epoch, MemOrdering::SeqCst); } /// Declare the start of an epoch. This information is provided for spill so that the spill task won't @@ -639,7 +619,6 @@ impl StateStore for HummockStorage { /// Waits until the local hummock version contains the epoch. If `wait_epoch` is `Current`, /// we will only check whether it is le `sealed_epoch` and won't wait. async fn try_wait_epoch(&self, wait_epoch: HummockReadEpoch) -> StorageResult<()> { - self.validate_read_epoch(wait_epoch)?; let wait_epoch = match wait_epoch { HummockReadEpoch::Committed(epoch) => { assert!(!is_max_epoch(epoch), "epoch should not be MAX EPOCH"); @@ -664,55 +643,9 @@ impl StateStore for HummockStorage { }) } - fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { - // Update `seal_epoch` synchronously, - // as `HummockEvent::SealEpoch` is handled asynchronously. - let prev_epoch = self.seal_epoch.swap(epoch, MemOrdering::SeqCst); - assert_gt!(epoch, prev_epoch); - - if is_checkpoint { - let _ = self.min_current_epoch.compare_exchange( - HummockEpoch::MAX, - epoch, - MemOrdering::SeqCst, - MemOrdering::SeqCst, - ); - } - StoreLocalStatistic::flush_all(); - } - fn new_local(&self, option: NewLocalOptions) -> impl Future + Send + '_ { self.new_local_inner(option) } - - fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { - if let HummockReadEpoch::Current(read_current_epoch) = epoch { - assert!( - !is_max_epoch(read_current_epoch), - "epoch should not be MAX EPOCH" - ); - let sealed_epoch = self.seal_epoch.load(MemOrdering::SeqCst); - if read_current_epoch > sealed_epoch { - tracing::warn!( - "invalid barrier read {} > max seal epoch {}", - read_current_epoch, - sealed_epoch - ); - return Err(HummockError::read_current_epoch().into()); - } - - let min_current_epoch = self.min_current_epoch.load(MemOrdering::SeqCst); - if read_current_epoch < min_current_epoch { - tracing::warn!( - "invalid barrier read {} < min current epoch {}", - read_current_epoch, - min_current_epoch - ); - return Err(HummockError::read_current_epoch().into()); - } - } - Ok(()) - } } #[cfg(any(test, feature = "test"))] @@ -721,7 +654,6 @@ impl HummockStorage { &self, epoch: u64, ) -> StorageResult { - self.seal_epoch(epoch, true); let table_ids = self .pinned_version .load() diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index e8da757523ed5..7416f54688a5f 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -750,15 +750,9 @@ impl StateStore for RangeKvStateStore { } } - fn seal_epoch(&self, _epoch: u64, _is_checkpoint: bool) {} - async fn new_local(&self, option: NewLocalOptions) -> Self::Local { MemtableLocalStateStore::new(self.clone(), option) } - - fn validate_read_epoch(&self, _epoch: HummockReadEpoch) -> StorageResult<()> { - Ok(()) - } } pub struct RangeKvStateStoreIter { diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 30350d9b9c648..ec2785c354229 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -324,10 +324,6 @@ impl StateStore for MonitoredStateStore { } } - fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { - self.inner.seal_epoch(epoch, is_checkpoint); - } - fn monitored( self, _storage_metrics: Arc, @@ -344,10 +340,6 @@ impl StateStore for MonitoredStateStore { self.storage_metrics.clone(), ) } - - fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { - self.inner.validate_read_epoch(epoch) - } } impl MonitoredStateStore { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 246313f71a498..dac8712a924d7 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -265,23 +265,9 @@ impl StateStore for TracedStateStore { }) } - fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { - let _span = TraceSpan::new_seal_span(epoch, is_checkpoint, self.storage_type); - self.inner.seal_epoch(epoch, is_checkpoint); - } - async fn new_local(&self, options: NewLocalOptions) -> Self::Local { TracedStateStore::new_local(self.inner.new_local(options.clone()).await, options) } - - fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { - let span = TraceSpan::new_validate_read_epoch_span(epoch); - let res = self.inner.validate_read_epoch(epoch); - span.may_send_result(OperationResult::ValidateReadEpoch( - res.as_ref().map(|o| *o).into(), - )); - res - } } impl StateStoreRead for TracedStateStore { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 42737d914f536..a7fb7c3643ed0 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -182,18 +182,10 @@ impl StateStore for PanicStateStore { async { panic!("should not await sync epoch from the panic state store!") } } - fn seal_epoch(&self, _epoch: u64, _is_checkpoint: bool) { - panic!("should not update current epoch from the panic state store!"); - } - #[allow(clippy::unused_async)] async fn new_local(&self, _option: NewLocalOptions) -> Self::Local { panic!("should not call new local from the panic state store"); } - - fn validate_read_epoch(&self, _epoch: HummockReadEpoch) -> StorageResult<()> { - panic!("should not call validate_read_epoch from the panic state store"); - } } pub struct PanicStateStoreIter(PhantomData); diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index f382bf5fc2d5d..91f79231f6939 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -356,18 +356,12 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone { fn sync(&self, epoch: u64, table_ids: HashSet) -> impl SyncFuture; - /// update max current epoch in storage. - fn seal_epoch(&self, epoch: u64, is_checkpoint: bool); - /// Creates a [`MonitoredStateStore`] from this state store, with given `stats`. fn monitored(self, storage_metrics: Arc) -> MonitoredStateStore { MonitoredStateStore::new(self, storage_metrics) } fn new_local(&self, option: NewLocalOptions) -> impl Future + Send + '_; - - /// Validates whether store can serve `epoch` at the moment. - fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()>; } /// A state store that is dedicated for streaming operator, which only reads the uncommitted data diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 22d30a5540af4..d9261d67e24bc 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -588,10 +588,6 @@ pub mod verify { } } - fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { - self.actual.seal_epoch(epoch, is_checkpoint) - } - async fn new_local(&self, option: NewLocalOptions) -> Self::Local { let expected = if let Some(expected) = &self.expected { Some(expected.new_local(option.clone()).await) @@ -604,10 +600,6 @@ pub mod verify { _phantom: PhantomData::<()>, } } - - fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { - self.actual.validate_read_epoch(epoch) - } } impl Deref for VerifyStateStore { @@ -1170,11 +1162,7 @@ pub mod boxed_state_store { table_ids: HashSet, ) -> BoxFuture<'static, StorageResult>; - fn seal_epoch(&self, epoch: u64, is_checkpoint: bool); - async fn new_local(&self, option: NewLocalOptions) -> BoxDynamicDispatchedLocalStateStore; - - fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()>; } #[async_trait::async_trait] @@ -1191,17 +1179,9 @@ pub mod boxed_state_store { self.sync(epoch, table_ids).boxed() } - fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { - self.seal_epoch(epoch, is_checkpoint); - } - async fn new_local(&self, option: NewLocalOptions) -> BoxDynamicDispatchedLocalStateStore { Box::new(self.new_local(option).await) } - - fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { - self.validate_read_epoch(epoch) - } } pub type BoxDynamicDispatchedStateStore = Box; @@ -1285,19 +1265,11 @@ pub mod boxed_state_store { self.deref().sync(epoch, table_ids) } - fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { - self.deref().seal_epoch(epoch, is_checkpoint) - } - fn new_local( &self, option: NewLocalOptions, ) -> impl Future + Send + '_ { self.deref().new_local(option) } - - fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> StorageResult<()> { - self.deref().validate_read_epoch(epoch) - } } } diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index ba5df435db768..7a0ad76cce4a5 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -387,9 +387,6 @@ impl StorageTableInner { ..Default::default() }; if let Some(value) = self.store.get(serialized_pk, epoch, read_options).await? { - // Refer to [`StorageTableInnerIterInner::new`] for necessity of `validate_read_epoch`. - self.store.validate_read_epoch(wait_epoch)?; - let row = self.row_serde.deserialize(&value)?; let result_row_in_value = self.mapping.project(OwnedRow::new(row)); @@ -872,11 +869,6 @@ impl StorageTableInnerIterInner { let raw_epoch = epoch.get_epoch(); store.try_wait_epoch(epoch).await?; let iter = store.iter(table_key_range, raw_epoch, read_options).await?; - // For `HummockStorage`, a cluster recovery will clear storage data and make subsequent - // `HummockReadEpoch::Current` read incomplete. - // `validate_read_epoch` is a safeguard against that incorrect read. It rejects the read - // result if any recovery has happened after `try_wait_epoch`. - store.validate_read_epoch(epoch)?; let iter = Self { iter, mapping, diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index f75767d22ea08..f4e62e429effa 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -526,7 +526,6 @@ mod tests { let epoch3 = epoch2.next_epoch(); writer.flush_current_epoch(epoch3, true).await.unwrap(); - test_env.storage.seal_epoch(epoch1, false); let sync_result = test_env.storage.seal_and_sync_epoch(epoch2).await.unwrap(); assert!(!sync_result.uncommitted_ssts.is_empty()); @@ -633,8 +632,6 @@ mod tests { let epoch3 = epoch2.next_epoch(); writer.flush_current_epoch(epoch3, true).await.unwrap(); - test_env.storage.seal_epoch(epoch1, false); - reader.init().await.unwrap(); match reader.next_item().await.unwrap() { ( @@ -1133,7 +1130,6 @@ mod tests { } // Truncation of reader1 on epoch1 should work because it is before this sync - test_env.storage.seal_epoch(epoch1, false); test_env.commit_epoch(epoch2).await; test_env .storage @@ -1727,7 +1723,6 @@ mod tests { let epoch3 = epoch2.next_epoch(); writer.flush_current_epoch(epoch3, true).await.unwrap(); - test_env.storage.seal_epoch(epoch1, false); test_env.commit_epoch(epoch2).await; reader.init().await.unwrap(); diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index ff44399b22a34..fa0a5624b8ad5 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -1183,7 +1183,7 @@ mod tests { let row = table .get_row( &OwnedRow::new(vec![Some(x.into())]), - HummockReadEpoch::Current(u64::MAX), + HummockReadEpoch::NoWait(u64::MAX), ) .await .unwrap(); diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index b6cf8c525a5ed..88e86a5998758 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -52,7 +52,6 @@ use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::runtime::BackgroundShutdownRuntime; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::{HummockVersionId, LocalSstableInfo, SyncResult}; -use risingwave_pb::common::ActorInfo; use risingwave_pb::stream_plan::barrier::BarrierKind; use risingwave_pb::stream_service::streaming_control_stream_request::{InitRequest, Request}; use risingwave_pb::stream_service::streaming_control_stream_response::{ @@ -215,18 +214,6 @@ pub(super) enum LocalActorOperation { actors: Vec, result_sender: oneshot::Sender<()>, }, - UpdateActors { - actors: Vec, - result_sender: oneshot::Sender>, - }, - BuildActors { - actors: Vec, - result_sender: oneshot::Sender>, - }, - UpdateActorInfo { - new_actor_infos: Vec, - result_sender: oneshot::Sender>, - }, TakeReceiver { ids: UpDownActorIds, result_sender: oneshot::Sender>, @@ -431,6 +418,14 @@ impl LocalBarrierWorker { match request.request.expect("should not be empty") { Request::InjectBarrier(req) => { let barrier = Barrier::from_protobuf(req.get_barrier().unwrap())?; + self.update_actor_info(req.broadcast_info)?; + let actors = req + .actors_to_build + .iter() + .map(|actor| actor.actor.as_ref().unwrap().actor_id) + .collect_vec(); + self.update_actors(req.actors_to_build)?; + self.start_create_actors(&actors)?; self.send_barrier( &barrier, req.actor_ids_to_collect.into_iter().collect(), @@ -506,23 +501,6 @@ impl LocalBarrierWorker { self.drop_actors(&actors); let _ = result_sender.send(()); } - LocalActorOperation::UpdateActors { - actors, - result_sender, - } => { - let result = self.update_actors(actors); - let _ = result_sender.send(result); - } - LocalActorOperation::BuildActors { - actors, - result_sender, - } => self.start_create_actors(&actors, result_sender), - LocalActorOperation::UpdateActorInfo { - new_actor_infos, - result_sender, - } => { - let _ = result_sender.send(self.update_actor_info(new_actor_infos)); - } LocalActorOperation::TakeReceiver { ids, result_sender } => { let _ = result_sender.send(self.current_shared_context.take_receiver(ids)); } @@ -1092,6 +1070,8 @@ pub(crate) mod barrier_test_utils { table_ids_to_sync: vec![], partial_graph_id: u32::MAX, actor_ids_to_pre_sync_barrier_mutation: vec![], + broadcast_info: vec![], + actors_to_build: vec![], }, )), })) diff --git a/src/stream/src/task/barrier_manager/managed_state.rs b/src/stream/src/task/barrier_manager/managed_state.rs index 2bc7fbf88566d..5ccde5004801d 100644 --- a/src/stream/src/task/barrier_manager/managed_state.rs +++ b/src/stream/src/task/barrier_manager/managed_state.rs @@ -364,9 +364,6 @@ impl InflightActorState { } pub(super) struct PartialGraphManagedBarrierState { - /// This is a temporary workaround for the need to still calling `seal_epoch` for storage. - /// Can be removed after `seal_epoch` is deprecated in storage. - need_seal_epoch: bool, /// Record barrier state for each epoch of concurrent checkpoints. /// /// The key is `prev_epoch`, and the first value is `curr_epoch` @@ -390,13 +387,11 @@ pub(super) struct PartialGraphManagedBarrierState { impl PartialGraphManagedBarrierState { fn new( - need_seal_epoch: bool, state_store: StateStoreImpl, streaming_metrics: Arc, barrier_await_tree_reg: Option, ) -> Self { Self { - need_seal_epoch, epoch_barrier_state_map: Default::default(), prev_barrier_table_ids: None, create_mview_progress: Default::default(), @@ -410,7 +405,6 @@ impl PartialGraphManagedBarrierState { #[cfg(test)] pub(crate) fn for_test() -> Self { Self::new( - true, StateStoreImpl::for_test(), Arc::new(StreamingMetrics::unused()), None, @@ -575,7 +569,6 @@ impl ManagedBarrierState { .entry(partial_graph_id) .or_insert_with(|| { PartialGraphManagedBarrierState::new( - partial_graph_id.is_global_graph(), self.state_store.clone(), self.streaming_metrics.clone(), self.barrier_await_tree_reg.clone(), @@ -718,19 +711,9 @@ impl PartialGraphManagedBarrierState { tracing::info!(?prev_epoch, "ignored syncing data for the first barrier"); None } - BarrierKind::Barrier => { - if self.need_seal_epoch { - dispatch_state_store!(&self.state_store, state_store, { - state_store.seal_epoch(prev_epoch, kind.is_checkpoint()); - }); - } - None - } + BarrierKind::Barrier => None, BarrierKind::Checkpoint => { dispatch_state_store!(&self.state_store, state_store, { - if self.need_seal_epoch { - state_store.seal_epoch(prev_epoch, kind.is_checkpoint()); - } Some(sync_epoch( state_store, &self.streaming_metrics, diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index a083cc3974da9..60b7341371497 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -42,7 +42,6 @@ use risingwave_storage::monitor::HummockTraceFutureExt; use risingwave_storage::{dispatch_state_store, StateStore}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; -use tokio::sync::oneshot; use tokio::task::JoinHandle; use tonic::Status; @@ -225,33 +224,6 @@ impl LocalStreamManager { .await } - pub async fn update_actors(&self, actors: Vec) -> StreamResult<()> { - self.actor_op_tx - .send_and_await(|result_sender| LocalActorOperation::UpdateActors { - actors, - result_sender, - }) - .await? - } - - pub async fn build_actors(&self, actors: Vec) -> StreamResult<()> { - self.actor_op_tx - .send_and_await(|result_sender| LocalActorOperation::BuildActors { - actors, - result_sender, - }) - .await? - } - - pub async fn update_actor_info(&self, new_actor_infos: Vec) -> StreamResult<()> { - self.actor_op_tx - .send_and_await(|result_sender| LocalActorOperation::UpdateActorInfo { - new_actor_infos, - result_sender, - }) - .await? - } - pub async fn take_receiver(&self, ids: UpDownActorIds) -> StreamResult { self.actor_op_tx .send_and_await(|result_sender| LocalActorOperation::TakeReceiver { @@ -326,31 +298,18 @@ impl LocalBarrierWorker { /// This function could only be called once during the lifecycle of `LocalStreamManager` for /// now. - pub(super) fn start_create_actors( - &mut self, - actors: &[ActorId], - result_sender: oneshot::Sender>, - ) { - let actors: Vec<_> = { - let actor_result = actors - .iter() - .map(|actor_id| { - self.actor_manager_state - .actors - .remove(actor_id) - .ok_or_else(|| anyhow!("No such actor with actor id:{}", actor_id)) - }) - .try_collect(); - match actor_result { - Ok(actors) => actors, - Err(e) => { - let _ = result_sender.send(Err(e.into())); - return; - } - } - }; + pub(super) fn start_create_actors(&mut self, actors: &[ActorId]) -> StreamResult<()> { + let actors: Vec<_> = actors + .iter() + .map(|actor_id| { + self.actor_manager_state + .actors + .remove(actor_id) + .ok_or_else(|| anyhow!("No such actor with actor id:{}", actor_id)) + }) + .try_collect()?; self.spawn_actors(actors); - let _ = result_sender.send(Ok(())); + Ok(()) } }