diff --git a/proto/meta.proto b/proto/meta.proto index d1f858abbd43a..fe6145ee24055 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -155,9 +155,7 @@ message MigrationPlan { map worker_slot_migration_plan = 2; } -message FlushRequest { - uint32 database_id = 1; -} +message FlushRequest {} message FlushResponse { common.Status status = 1; diff --git a/src/common/src/catalog/mod.rs b/src/common/src/catalog/mod.rs index 23cdf04cdf8a9..86c6e8895c066 100644 --- a/src/common/src/catalog/mod.rs +++ b/src/common/src/catalog/mod.rs @@ -157,7 +157,7 @@ pub trait SysCatalogReader: Sync + Send + 'static { pub type SysCatalogReaderRef = Arc; -#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq, Copy)] +#[derive(Clone, Debug, Default, Display, Hash, PartialOrd, PartialEq, Eq)] #[display("{database_id}")] pub struct DatabaseId { pub database_id: u32, diff --git a/src/frontend/src/handler/flush.rs b/src/frontend/src/handler/flush.rs index 666dca64583ba..f4ed5307f7d95 100644 --- a/src/frontend/src/handler/flush.rs +++ b/src/frontend/src/handler/flush.rs @@ -26,13 +26,7 @@ pub(super) async fn handle_flush(handler_args: HandlerArgs) -> Result Result<()> { let client = session.env().meta_client(); - let database_id = session - .env() - .catalog_reader() - .read_guard() - .get_database_by_name(session.database())? - .id(); - let version_id = client.flush(database_id).await?; + let version_id = client.flush().await?; // Wait for the snapshot to be synchronized, so that future reads in this session can see // previous writes. diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index a91a0d8abc878..2878b84190a2f 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -39,8 +39,6 @@ use risingwave_pb::meta::{EventLog, PbThrottleTarget, RecoveryStatus}; use risingwave_rpc_client::error::Result; use risingwave_rpc_client::{HummockMetaClient, MetaClient}; -use crate::catalog::DatabaseId; - /// A wrapper around the `MetaClient` that only provides a minor set of meta rpc. /// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`, /// `WorkerNodeManager`, etc. So frontend rarely needs to call `MetaClient` directly. @@ -50,7 +48,7 @@ use crate::catalog::DatabaseId; pub trait FrontendMetaClient: Send + Sync { async fn try_unregister(&self); - async fn flush(&self, database_id: DatabaseId) -> Result; + async fn flush(&self) -> Result; async fn wait(&self) -> Result<()>; @@ -135,8 +133,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl { self.0.try_unregister().await; } - async fn flush(&self, database_id: DatabaseId) -> Result { - self.0.flush(database_id).await + async fn flush(&self) -> Result { + self.0.flush().await } async fn wait(&self) -> Result<()> { diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 6001e89487cba..46c62d5ff1207 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -917,7 +917,7 @@ pub struct MockFrontendMetaClient {} impl FrontendMetaClient for MockFrontendMetaClient { async fn try_unregister(&self) {} - async fn flush(&self, _database_id: DatabaseId) -> RpcResult { + async fn flush(&self) -> RpcResult { Ok(INVALID_VERSION_ID) } diff --git a/src/meta/service/src/scale_service.rs b/src/meta/service/src/scale_service.rs index 112720d237c2a..889df5b33904c 100644 --- a/src/meta/service/src/scale_service.rs +++ b/src/meta/service/src/scale_service.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use risingwave_common::catalog::{DatabaseId, TableId}; +use risingwave_common::catalog::TableId; use risingwave_meta::manager::MetadataManager; use risingwave_meta::model::TableParallelism; use risingwave_meta::stream::{RescheduleOptions, ScaleControllerRef, WorkerReschedule}; @@ -137,7 +137,6 @@ impl ScaleService for ScaleServiceImpl { self.stream_manager .reschedule_actors( - DatabaseId::new(0), worker_reschedules .into_iter() .map(|(fragment_id, reschedule)| { diff --git a/src/meta/service/src/stream_service.rs b/src/meta/service/src/stream_service.rs index dcc49cc37ba88..a3d30f1a0926f 100644 --- a/src/meta/service/src/stream_service.rs +++ b/src/meta/service/src/stream_service.rs @@ -15,7 +15,7 @@ use std::collections::{HashMap, HashSet}; use itertools::Itertools; -use risingwave_common::catalog::{DatabaseId, TableId}; +use risingwave_common::catalog::TableId; use risingwave_connector::source::SplitMetaData; use risingwave_meta::manager::{LocalNotification, MetadataManager}; use risingwave_meta::model; @@ -69,9 +69,9 @@ impl StreamManagerService for StreamServiceImpl { #[cfg_attr(coverage, coverage(off))] async fn flush(&self, request: Request) -> TonicResponse { self.env.idle_manager().record_activity(); - let req = request.into_inner(); + let _req = request.into_inner(); - let version_id = self.barrier_scheduler.flush(req.database_id.into()).await?; + let version_id = self.barrier_scheduler.flush().await?; Ok(Response::new(FlushResponse { status: None, hummock_version_id: version_id.to_u64(), @@ -80,34 +80,16 @@ impl StreamManagerService for StreamServiceImpl { #[cfg_attr(coverage, coverage(off))] async fn pause(&self, _: Request) -> Result, Status> { - // TODO: call on database id one by one after supporting database isolation - let database_id = DatabaseId::new( - self.metadata_manager - .list_active_database_ids() - .await? - .into_iter() - .next() - .unwrap_or_default() as _, - ); self.barrier_scheduler - .run_command(database_id, Command::pause(PausedReason::Manual)) + .run_command(Command::pause(PausedReason::Manual)) .await?; Ok(Response::new(PauseResponse {})) } #[cfg_attr(coverage, coverage(off))] async fn resume(&self, _: Request) -> Result, Status> { - // TODO: call on database id one by one after supporting database isolation - let database_id = DatabaseId::new( - self.metadata_manager - .list_active_database_ids() - .await? - .into_iter() - .next() - .unwrap_or_default() as _, - ); self.barrier_scheduler - .run_command(database_id, Command::resume(PausedReason::Manual)) + .run_command(Command::resume(PausedReason::Manual)) .await?; Ok(Response::new(ResumeResponse {})) } @@ -140,30 +122,23 @@ impl StreamManagerService for StreamServiceImpl { } }; - for (database_id, actor_to_apply) in self - .metadata_manager - .split_fragment_map_by_database(actor_to_apply) - .await? - { - let database_id = DatabaseId::new(database_id as _); - // TODO: check whether shared source is correct - let mutation: ThrottleConfig = actor_to_apply - .iter() - .map(|(fragment_id, actors)| { - ( - *fragment_id, - actors - .iter() - .map(|actor_id| (*actor_id, request.rate)) - .collect::>>(), - ) - }) - .collect(); - let _i = self - .barrier_scheduler - .run_command(database_id, Command::Throttle(mutation)) - .await?; - } + // TODO: check whether shared source is correct + let mutation: ThrottleConfig = actor_to_apply + .iter() + .map(|(fragment_id, actors)| { + ( + *fragment_id, + actors + .iter() + .map(|actor_id| (*actor_id, request.rate)) + .collect::>>(), + ) + }) + .collect(); + let _i = self + .barrier_scheduler + .run_command(Command::Throttle(mutation)) + .await?; Ok(Response::new(ApplyThrottleResponse { status: None })) } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 185ff41959624..a3ec21ae1f315 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -211,8 +211,7 @@ pub enum CreateStreamingJobType { /// collected. #[derive(Debug, Clone, strum::Display)] pub enum Command { - /// `Flush` command will generate a checkpoint barrier. After the barrier is collected and committed - /// all messages before the checkpoint barrier should have been committed. + /// `Flush` command will generate a checkpoint barrier Flush, /// `Pause` command generates a `Pause` barrier with the provided [`PausedReason`] **only if** diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 489f23788c199..7a8fe28a1cd30 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -25,7 +25,7 @@ use fail::fail_point; use futures::future::try_join_all; use itertools::Itertools; use prometheus::HistogramTimer; -use risingwave_common::catalog::{DatabaseId, TableId}; +use risingwave_common::catalog::TableId; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::system_param::PAUSE_ON_NEXT_BOOTSTRAP_KEY; use risingwave_common::{bail, must_match}; @@ -125,7 +125,7 @@ enum BarrierManagerStatus { /// Scheduled command with its notifiers. struct Scheduled { - command: Option<(DatabaseId, Command, Vec)>, + command: Option<(Command, Vec)>, span: tracing::Span, /// Choose a different barrier(checkpoint == true) according to it checkpoint: bool, @@ -823,7 +823,7 @@ impl GlobalBarrierManager { span, } = scheduled; - let (mut command, mut notifiers) = if let Some((_, command, notifiers)) = command { + let (mut command, mut notifiers) = if let Some((command, notifiers)) = command { (Some(command), notifiers) } else { (None, vec![]) diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 8fd698f0d42ea..db73226219128 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -21,7 +21,7 @@ use anyhow::{anyhow, Context}; use assert_matches::assert_matches; use parking_lot::Mutex; use prometheus::HistogramTimer; -use risingwave_common::catalog::{DatabaseId, TableId}; +use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::HummockVersionId; use risingwave_pb::meta::PausedReason; use tokio::select; @@ -58,7 +58,6 @@ enum QueueStatus { } struct ScheduledQueueItem { - database_id: DatabaseId, command: Command, notifiers: Vec, send_latency_timer: HistogramTimer, @@ -125,7 +124,6 @@ impl Inner { /// Create a new scheduled barrier with the given `checkpoint`, `command` and `notifiers`. fn new_scheduled( &self, - database_id: DatabaseId, command: Command, notifiers: impl IntoIterator, ) -> ScheduledQueueItem { @@ -133,7 +131,6 @@ impl Inner { let span = tracing_span(); ScheduledQueueItem { - database_id, command, notifiers: notifiers.into_iter().collect(), send_latency_timer: self.metrics.barrier_send_latency.start_timer(), @@ -223,11 +220,7 @@ impl BarrierScheduler { /// Returns the barrier info of each command. /// /// TODO: atomicity of multiple commands is not guaranteed. - async fn run_multiple_commands( - &self, - database_id: DatabaseId, - commands: Vec, - ) -> MetaResult<()> { + async fn run_multiple_commands(&self, commands: Vec) -> MetaResult<()> { let mut contexts = Vec::with_capacity(commands.len()); let mut scheduleds = Vec::with_capacity(commands.len()); @@ -237,7 +230,6 @@ impl BarrierScheduler { contexts.push((started_rx, collect_rx)); scheduleds.push(self.inner.new_scheduled( - database_id, command, once(Notifier { started: Some(started_tx), @@ -271,39 +263,31 @@ impl BarrierScheduler { /// configuration change. /// /// Returns the barrier info of the actual command. - pub async fn run_config_change_command_with_pause( - &self, - database_id: DatabaseId, - command: Command, - ) -> MetaResult<()> { - self.run_multiple_commands( - database_id, - vec![ - Command::pause(PausedReason::ConfigChange), - command, - Command::resume(PausedReason::ConfigChange), - ], - ) + pub async fn run_config_change_command_with_pause(&self, command: Command) -> MetaResult<()> { + self.run_multiple_commands(vec![ + Command::pause(PausedReason::ConfigChange), + command, + Command::resume(PausedReason::ConfigChange), + ]) .await } /// Run a command and return when it's completely finished. /// /// Returns the barrier info of the actual command. - pub async fn run_command(&self, database_id: DatabaseId, command: Command) -> MetaResult<()> { + pub async fn run_command(&self, command: Command) -> MetaResult<()> { tracing::trace!("run_command: {:?}", command); - let ret = self.run_multiple_commands(database_id, vec![command]).await; + let ret = self.run_multiple_commands(vec![command]).await; tracing::trace!("run_command finished"); ret } /// Flush means waiting for the next barrier to collect. - pub async fn flush(&self, database_id: DatabaseId) -> MetaResult { + pub async fn flush(&self) -> MetaResult { let start = Instant::now(); tracing::debug!("start barrier flush"); - self.run_multiple_commands(database_id, vec![Command::Flush]) - .await?; + self.run_multiple_commands(vec![Command::Flush]).await?; let elapsed = Instant::now().duration_since(start); tracing::debug!("barrier flushed in {:?}", elapsed); @@ -351,7 +335,7 @@ impl ScheduledBarriers { let checkpoint = item.command.need_checkpoint() || checkpoint; item.send_latency_timer.observe_duration(); Scheduled { - command: Some((item.database_id, item.command, item.notifiers)), + command: Some((item.command, item.notifiers)), span: item.span, checkpoint, } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 9a40d20215e97..08824459e916e 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -57,7 +57,7 @@ use sea_orm::ActiveValue::Set; use sea_orm::{ ActiveModelTrait, ColumnTrait, DatabaseConnection, DatabaseTransaction, EntityTrait, IntoActiveModel, JoinType, PaginatorTrait, QueryFilter, QuerySelect, RelationTrait, - SelectColumns, TransactionTrait, Value, + TransactionTrait, Value, }; use tokio::sync::oneshot::Sender; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -103,7 +103,6 @@ pub struct CatalogController { #[derive(Clone, Default)] pub struct ReleaseContext { - pub(crate) database_id: DatabaseId, pub(crate) streaming_job_ids: Vec, /// Dropped state table list, need to unregister from hummock. pub(crate) state_table_ids: Vec, @@ -419,7 +418,6 @@ impl CatalogController { .await; Ok(( ReleaseContext { - database_id, streaming_job_ids: streaming_jobs, state_table_ids, source_ids, @@ -432,17 +430,6 @@ impl CatalogController { )) } - pub async fn get_object_database_id(&self, object_id: ObjectId) -> MetaResult { - let inner = self.inner.read().await; - let (database_id,): (Option,) = Object::find_by_id(object_id) - .select_column(object::Column::DatabaseId) - .into_tuple() - .one(&inner.db) - .await? - .ok_or_else(|| MetaError::catalog_id_not_found("object", object_id))?; - Ok(database_id.ok_or_else(|| anyhow!("object has no database id: {object_id}"))?) - } - pub async fn create_schema(&self, schema: PbSchema) -> MetaResult { let inner = self.inner.write().await; let owner_id = schema.owner as _; @@ -2122,9 +2109,6 @@ impl CatalogController { .await? .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?; assert_eq!(obj.obj_type, object_type); - let database_id = obj - .database_id - .ok_or_else(|| anyhow!("dropped object should have database_id"))?; let mut to_drop_objects = match drop_mode { DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?, @@ -2289,18 +2273,6 @@ impl CatalogController { to_drop_objects.extend(to_drop_internal_table_objs); } - let to_drop_objects = to_drop_objects; - - to_drop_objects.iter().for_each(|obj| { - if let Some(obj_database_id) = obj.database_id { - assert_eq!( - database_id, obj_database_id, - "dropped objects not in the same database: {:?}", - obj - ); - } - }); - let (source_fragments, removed_actors, removed_fragments) = resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?; @@ -2352,7 +2324,6 @@ impl CatalogController { Ok(( ReleaseContext { - database_id, streaming_job_ids: to_drop_streaming_jobs, state_table_ids: to_drop_state_table_ids, source_ids: to_drop_source_ids, diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 220ee8815393f..e118ad3c9683c 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -26,9 +26,9 @@ use risingwave_meta_model::actor::ActorStatus; use risingwave_meta_model::fragment::DistributionType; use risingwave_meta_model::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; use risingwave_meta_model::{ - actor, actor_dispatcher, fragment, object, sink, streaming_job, ActorId, ActorUpstreamActors, - ConnectorSplits, DatabaseId, ExprContext, FragmentId, I32Array, JobStatus, ObjectId, SinkId, - SourceId, StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, + actor, actor_dispatcher, fragment, sink, streaming_job, ActorId, ActorUpstreamActors, + ConnectorSplits, ExprContext, FragmentId, I32Array, JobStatus, ObjectId, SinkId, SourceId, + StreamNode, StreamingParallelism, TableId, VnodeBitmap, WorkerId, }; use risingwave_meta_model_migration::{Alias, SelectStatement}; use risingwave_pb::common::PbActorLocation; @@ -51,8 +51,8 @@ use risingwave_pb::stream_plan::{ use sea_orm::sea_query::Expr; use sea_orm::ActiveValue::Set; use sea_orm::{ - ColumnTrait, DbErr, EntityOrSelect, EntityTrait, JoinType, ModelTrait, PaginatorTrait, - QueryFilter, QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value, + ColumnTrait, DbErr, EntityTrait, JoinType, ModelTrait, PaginatorTrait, QueryFilter, + QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value, }; use crate::controller::catalog::{CatalogController, CatalogControllerInner}; @@ -659,24 +659,6 @@ impl CatalogController { Ok(object_ids) } - pub async fn list_fragment_database_ids( - &self, - select_fragment_ids: Option>, - ) -> MetaResult> { - let inner = self.inner.read().await; - let select = Fragment::find() - .select() - .column(fragment::Column::FragmentId) - .column(object::Column::DatabaseId) - .join(JoinType::InnerJoin, fragment::Relation::Object.def()); - let select = if let Some(select_fragment_ids) = select_fragment_ids { - select.filter(fragment::Column::FragmentId.is_in(select_fragment_ids)) - } else { - select - }; - Ok(select.into_tuple().all(&inner.db).await?) - } - pub async fn get_job_fragments_by_id(&self, job_id: ObjectId) -> MetaResult { let inner = self.inner.read().await; let fragment_actors = Fragment::find() diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 64d484bc89107..e8adc309d10e2 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -15,7 +15,6 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::num::NonZeroUsize; -use anyhow::anyhow; use itertools::Itertools; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; @@ -455,27 +454,23 @@ impl CatalogController { } /// `try_abort_creating_streaming_job` is used to abort the job that is under initial status or in `FOREGROUND` mode. - /// It returns (true, _) if the job is not found or aborted. - /// It returns (_, Some(`database_id`)) is the `database_id` of the `job_id` exists + /// It returns true if the job is not found or aborted. pub async fn try_abort_creating_streaming_job( &self, job_id: ObjectId, is_cancelled: bool, - ) -> MetaResult<(bool, Option)> { + ) -> MetaResult { let mut inner = self.inner.write().await; let txn = inner.db.begin().await?; - let obj = Object::find_by_id(job_id).one(&txn).await?; - let Some(obj) = obj else { + let cnt = Object::find_by_id(job_id).count(&txn).await?; + if cnt == 0 { tracing::warn!( id = job_id, "streaming job not found when aborting creating, might be cleaned by recovery" ); - return Ok((true, None)); - }; - let database_id = obj - .database_id - .ok_or_else(|| anyhow!("obj has no database id: {:?}", obj))?; + return Ok(true); + } if !is_cancelled { let streaming_job = streaming_job::Entity::find_by_id(job_id).one(&txn).await?; @@ -489,7 +484,7 @@ impl CatalogController { id = job_id, "streaming job is created in background and still in creating status" ); - return Ok((false, Some(database_id))); + return Ok(false); } } } @@ -573,7 +568,7 @@ impl CatalogController { self.notify_frontend(Operation::Delete, build_relation_group(objs)) .await; } - Ok((true, Some(database_id))) + Ok(true) } pub async fn post_collect_table_fragments( diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index df54db646dced..48dfbfaf55c0a 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -13,14 +13,13 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap, HashSet}; -use std::fmt::Debug; use std::pin::pin; use std::time::Duration; use anyhow::anyhow; use futures::future::{select, Either}; use risingwave_common::catalog::{TableId, TableOption}; -use risingwave_meta_model::{DatabaseId, ObjectId, SourceId, WorkerId}; +use risingwave_meta_model::{ObjectId, SourceId, WorkerId}; use risingwave_pb::catalog::{PbSink, PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; @@ -264,47 +263,6 @@ impl MetadataManager { self.cluster_controller.list_active_serving_workers().await } - pub async fn list_active_database_ids(&self) -> MetaResult> { - Ok(self - .catalog_controller - .list_fragment_database_ids(None) - .await? - .into_iter() - .map(|(_, database_id)| database_id) - .collect()) - } - - pub async fn split_fragment_map_by_database( - &self, - fragment_map: HashMap, - ) -> MetaResult>> { - let fragment_to_database_map: HashMap<_, _> = self - .catalog_controller - .list_fragment_database_ids(Some( - fragment_map - .keys() - .map(|fragment_id| *fragment_id as _) - .collect(), - )) - .await? - .into_iter() - .map(|(fragment_id, database_id)| { - (fragment_id as FragmentId, database_id as DatabaseId) - }) - .collect(); - let mut ret: HashMap<_, HashMap<_, _>> = HashMap::new(); - for (fragment_id, value) in fragment_map { - let database_id = *fragment_to_database_map - .get(&fragment_id) - .ok_or_else(|| anyhow!("cannot get database_id of fragment {fragment_id}"))?; - ret.entry(database_id) - .or_default() - .try_insert(fragment_id, value) - .expect("non duplicate"); - } - Ok(ret) - } - pub async fn list_background_creating_jobs(&self) -> MetaResult> { let tables = self .catalog_controller diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index d8d088731ad70..995643215d317 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -605,20 +605,19 @@ impl DdlController { ) -> MetaResult { tracing::debug!("preparing drop subscription"); let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; - let subscription = self + let table_id = self .metadata_manager .catalog_controller .get_subscription_by_id(subscription_id) - .await?; - let table_id = subscription.dependent_table_id; - let database_id = subscription.database_id.into(); + .await? + .dependent_table_id; let (_, version) = self .metadata_manager .catalog_controller .drop_relation(ObjectType::Subscription, subscription_id as _, drop_mode) .await?; self.stream_manager - .drop_subscription(database_id, subscription_id as _, table_id) + .drop_subscription(subscription_id as _, table_id) .await; tracing::debug!("finish drop subscription"); Ok(version) @@ -1010,7 +1009,7 @@ impl DdlController { self.env.event_log_manager_ref().add_event_logs(vec![ risingwave_pb::meta::event_log::Event::CreateStreamJobFail(event), ]); - let (aborted, _) = self + let aborted = self .metadata_manager .catalog_controller .try_abort_creating_streaming_job(job_id as _, false) @@ -1277,7 +1276,6 @@ impl DdlController { } let ReleaseContext { - database_id, streaming_job_ids, state_table_ids, source_ids, @@ -1321,7 +1319,6 @@ impl DdlController { // drop streaming jobs. self.stream_manager .drop_streaming_jobs( - risingwave_common::catalog::DatabaseId::new(database_id as _), removed_actors.into_iter().map(|id| id as _).collect(), streaming_job_ids, state_table_ids, diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index d427de3590221..2dbd6364d4e2c 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -27,7 +27,7 @@ use num_integer::Integer; use num_traits::abs; use risingwave_common::bail; use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; -use risingwave_common::catalog::{DatabaseId, TableId}; +use risingwave_common::catalog::TableId; use risingwave_common::hash::ActorMapping; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_meta_model::{actor, fragment, ObjectId, StreamingParallelism, WorkerId}; @@ -850,7 +850,7 @@ impl ScaleController { /// - `reschedule_fragment`: the generated reschedule plan /// - `applied_reschedules`: the changes that need to be updated to the meta store (`pre_apply_reschedules`, only for V1). /// - /// In [normal process of scaling](`GlobalStreamManager::reschedule_actors`), we use the returned values to + /// In [normal process of scaling](`GlobalStreamManager::reschedule_actors_impl`), we use the returned values to /// build a [`Command::RescheduleFragment`], which will then flows through the barrier mechanism to perform scaling. /// Meta store is updated after the barrier is collected. /// @@ -2312,7 +2312,16 @@ impl GlobalStreamManager { /// * automatic parallelism control for [`TableParallelism::Adaptive`] when worker nodes changed pub async fn reschedule_actors( &self, - database_id: DatabaseId, + reschedules: HashMap, + options: RescheduleOptions, + table_parallelism: Option>, + ) -> MetaResult<()> { + self.reschedule_actors_impl(reschedules, options, table_parallelism) + .await + } + + async fn reschedule_actors_impl( + &self, reschedules: HashMap, options: RescheduleOptions, table_parallelism: Option>, @@ -2359,7 +2368,7 @@ impl GlobalStreamManager { let _source_pause_guard = self.source_manager.paused.lock().await; self.barrier_scheduler - .run_config_change_command_with_pause(database_id, command) + .run_config_change_command_with_pause(command) .await?; tracing::info!("reschedule done"); @@ -2488,22 +2497,15 @@ impl GlobalStreamManager { return Ok(false); }; - for (database_id, reschedules) in self - .metadata_manager - .split_fragment_map_by_database(reschedules) - .await? - { - self.reschedule_actors( - DatabaseId::new(database_id as _), - reschedules, - RescheduleOptions { - resolve_no_shuffle_upstream: false, - skip_create_new_actors: false, - }, - None, - ) - .await?; - } + self.reschedule_actors( + reschedules, + RescheduleOptions { + resolve_no_shuffle_upstream: false, + skip_create_new_actors: false, + }, + None, + ) + .await?; Ok(true) } diff --git a/src/meta/src/stream/source_manager.rs b/src/meta/src/stream/source_manager.rs index 0cfb820ecb692..c5bcc0c179ba3 100644 --- a/src/meta/src/stream/source_manager.rs +++ b/src/meta/src/stream/source_manager.rs @@ -29,7 +29,7 @@ use risingwave_connector::source::{ SplitEnumerator, SplitId, SplitImpl, SplitMetaData, }; use risingwave_connector::{dispatch_source_prop, WithOptionsSecResolved}; -use risingwave_meta_model::{DatabaseId, SourceId}; +use risingwave_meta_model::SourceId; use risingwave_pb::catalog::Source; use risingwave_pb::source::{ConnectorSplit, ConnectorSplits}; use risingwave_pb::stream_plan::Dispatcher; @@ -315,7 +315,7 @@ impl SourceManagerCore { /// /// `self.actor_splits` will not be updated. It will be updated by `Self::apply_source_change`, /// after the mutation barrier has been collected. - async fn reassign_splits(&self) -> MetaResult> { + async fn reassign_splits(&self) -> MetaResult { let mut split_assignment: SplitAssignment = HashMap::new(); for (source_id, handle) in &self.managed_sources { @@ -421,9 +421,7 @@ impl SourceManagerCore { } } - self.metadata_manager - .split_fragment_map_by_database(split_assignment) - .await + Ok(split_assignment) } fn apply_source_change( @@ -1125,16 +1123,9 @@ impl SourceManager { }; if !split_assignment.is_empty() { - for (database_id, split_assignment) in split_assignment { - let command = Command::SourceSplitAssignment(split_assignment); - tracing::info!(command = ?command, "pushing down split assignment command"); - self.barrier_scheduler - .run_command( - risingwave_common::catalog::DatabaseId::new(database_id as _), - command, - ) - .await?; - } + let command = Command::SourceSplitAssignment(split_assignment); + tracing::info!(command = ?command, "pushing down split assignment command"); + self.barrier_scheduler.run_command(command).await?; } Ok(()) diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index b55868e8d6535..21a642f1be1c7 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use futures::future::join_all; use itertools::Itertools; use risingwave_common::bail; -use risingwave_common::catalog::{DatabaseId, TableId}; +use risingwave_common::catalog::TableId; use risingwave_meta_model::{ObjectId, WorkerId}; use risingwave_pb::catalog::{CreateType, Subscription, Table}; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; @@ -238,7 +238,6 @@ impl GlobalStreamManager { ctx: CreateStreamingJobContext, ) -> MetaResult { let table_id = table_fragments.table_id(); - let database_id = ctx.streaming_job.database_id().into(); let (sender, mut receiver) = tokio::sync::mpsc::channel(10); let execution = StreamingJobExecution::new(table_id, sender.clone()); self.creating_job_info.add_job(execution).await; @@ -299,10 +298,7 @@ impl GlobalStreamManager { .await?; self.barrier_scheduler - .run_command( - database_id, - Command::CancelStreamingJob(table_fragments), - ) + .run_command(Command::CancelStreamingJob(table_fragments)) .await?; } else { // streaming job is already completed. @@ -426,15 +422,10 @@ impl GlobalStreamManager { if need_pause { // Special handling is required when creating sink into table, we need to pause the stream to avoid data loss. self.barrier_scheduler - .run_config_change_command_with_pause( - streaming_job.database_id().into(), - command, - ) + .run_config_change_command_with_pause(command) .await?; } else { - self.barrier_scheduler - .run_command(streaming_job.database_id().into(), command) - .await?; + self.barrier_scheduler.run_command(command).await?; } tracing::debug!(?streaming_job, "first barrier collected for stream job"); @@ -477,18 +468,15 @@ impl GlobalStreamManager { let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?; self.barrier_scheduler - .run_config_change_command_with_pause( - streaming_job.database_id().into(), - Command::ReplaceTable(ReplaceTablePlan { - old_table_fragments, - new_table_fragments: table_fragments, - merge_updates, - dispatchers, - init_split_assignment, - dummy_id, - streaming_job, - }), - ) + .run_config_change_command_with_pause(Command::ReplaceTable(ReplaceTablePlan { + old_table_fragments, + new_table_fragments: table_fragments, + merge_updates, + dispatchers, + init_split_assignment, + dummy_id, + streaming_job, + })) .await?; Ok(()) @@ -499,7 +487,6 @@ impl GlobalStreamManager { /// [`Command::DropStreamingJobs`] for details. pub async fn drop_streaming_jobs( &self, - database_id: DatabaseId, removed_actors: Vec, streaming_job_ids: Vec, state_table_ids: Vec, @@ -511,17 +498,14 @@ impl GlobalStreamManager { { let _ = self .barrier_scheduler - .run_command( - database_id, - Command::DropStreamingJobs { - actors: removed_actors, - unregistered_state_table_ids: state_table_ids - .into_iter() - .map(|table_id| TableId::new(table_id as _)) - .collect(), - unregistered_fragment_ids: fragment_ids, - }, - ) + .run_command(Command::DropStreamingJobs { + actors: removed_actors, + unregistered_state_table_ids: state_table_ids + .into_iter() + .map(|table_id| TableId::new(table_id as _)) + .collect(), + unregistered_fragment_ids: fragment_ids, + }) .await .inspect_err(|err| { tracing::error!(error = ?err.as_report(), "failed to run drop command"); @@ -569,16 +553,14 @@ impl GlobalStreamManager { )))?; } - let (_, database_id) = self.metadata_manager + self.metadata_manager .catalog_controller .try_abort_creating_streaming_job(id.table_id as _, true) .await?; - if let Some(database_id) = database_id { - self.barrier_scheduler - .run_command(DatabaseId::new(database_id as _), Command::CancelStreamingJob(fragment)) - .await?; - } + self.barrier_scheduler + .run_command(Command::CancelStreamingJob(fragment)) + .await?; }; match result { Ok(_) => { @@ -605,12 +587,6 @@ impl GlobalStreamManager { ) -> MetaResult<()> { let _reschedule_job_lock = self.reschedule_lock_write_guard().await; - let database_id = DatabaseId::new( - self.metadata_manager - .catalog_controller - .get_object_database_id(table_id as ObjectId) - .await? as _, - ); let table_id = TableId::new(table_id); let worker_nodes = self @@ -689,7 +665,6 @@ impl GlobalStreamManager { .await?; } else { self.reschedule_actors( - database_id, reschedules, RescheduleOptions { resolve_no_shuffle_upstream: false, @@ -716,19 +691,12 @@ impl GlobalStreamManager { }; tracing::debug!("sending Command::CreateSubscription"); - self.barrier_scheduler - .run_command(subscription.database_id.into(), command) - .await?; + self.barrier_scheduler.run_command(command).await?; Ok(()) } // Don't need to add actor, just send a command - pub async fn drop_subscription( - self: &Arc, - database_id: DatabaseId, - subscription_id: u32, - table_id: u32, - ) { + pub async fn drop_subscription(self: &Arc, subscription_id: u32, table_id: u32) { let command = Command::DropSubscription { subscription_id, upstream_mv_table_id: TableId::new(table_id), @@ -737,7 +705,7 @@ impl GlobalStreamManager { tracing::debug!("sending Command::DropSubscriptions"); let _ = self .barrier_scheduler - .run_command(database_id, command) + .run_command(command) .await .inspect_err(|err| { tracing::error!(error = ?err.as_report(), "failed to run drop command"); diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 0b096e53ddfa9..f4f2fc2777376 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -905,8 +905,8 @@ impl MetaClient { Ok(resp.tables) } - pub async fn flush(&self, database_id: DatabaseId) -> Result { - let request = FlushRequest { database_id }; + pub async fn flush(&self) -> Result { + let request = FlushRequest {}; let resp = self.inner.flush(request).await?; Ok(HummockVersionId::new(resp.hummock_version_id)) }