-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(sql-backend): [PART 2] introduce metadata manager and adapt it in all RPC services, dashboard, metric etc #14191
Conversation
…shboard, metric etc
use crate::MetaResult; | ||
|
||
#[derive(Clone)] | ||
pub enum MetadataManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MetadataManager
introduced here, providing two metadata managers based on different backend implementations. All RPC services, internal managers should use it instead to acquire and modify metadata.
@@ -15,8 +15,7 @@ message CreateDatabaseRequest { | |||
|
|||
message CreateDatabaseResponse { | |||
common.Status status = 1; | |||
uint32 database_id = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to return the id for all create ddl, they are never used. And it doesn't have any compatible issues, since they are never persisted.
@@ -170,17 +154,14 @@ impl DdlService for DdlServiceImpl { | |||
request: Request<CreateSchemaRequest>, | |||
) -> Result<Response<CreateSchemaResponse>, Status> { | |||
let req = request.into_inner(); | |||
let id = self.gen_unique_id::<{ IdCategory::Schema }>().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved gen_unique_id
to ddl controller, they will be called only in metadata manager V1 version.
.await; | ||
let result = try { | ||
// Add table fragments to meta store with state: `State::Initial`. | ||
mgr.fragment_manager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forward storing table fragments to make create_streaming_job
in stream manager compatible with two versions of metadata manager.
let id_gen = GlobalActorIdGen::new(&id_gen_manager, actor_len).await?; | ||
|
||
// TODO: use sql_id_gen that is not implemented yet. | ||
let id_gen = GlobalActorIdGen::new(env.id_gen_manager(), actor_len).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will replace the id_gen of sql-backend version in meta env in later PR, so we changed the parameter of id_gen to meta env.
@@ -740,10 +752,130 @@ pub fn start_worker_info_monitor( | |||
(join_handle, shutdown_tx) | |||
} | |||
|
|||
pub async fn refresh_fragment_info_metrics_v2( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metrics collection based on metadata manager v2.
I just added some comments for better review. 🥵 Rest of them are just some adaptation logic for metadata manager. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall. We'll probably need to run some more tests once we've combined PART3 and PART4.
source_manager, | ||
env, | ||
}, | ||
MetadataManager::V2(_) => unimplemented!("support v2 in scale controller"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll write an additional PR to handle this
{ | ||
// If the table fragments are not found, it means that the stream job has not been created. | ||
Err(err) => err.is_fragment_not_found(), | ||
Ok(table_fragments) => table_fragments.is_initial(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed a corner case: if the table fragments exists but is still in initial state, it should also should be cancelled. Because the actors are not marked as running yet and their upstream dispatchers are not persist after associated barrier get collected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
COOL 🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀 |
After this PR, the sink into table e2e fails. Details described in #14297 cc @shanicky @yezizp2012 |
🥵 Let me take a look, thx for report. |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Part 2 of #13949 . Introduced metadata manager and adapt it in all RPC services, all kind of managers, dashboard, metric etc.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.