-
Notifications
You must be signed in to change notification settings - Fork 328
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(remote_wal): make meta srv able to alloc kafka topics #2753
Conversation
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## develop #2753 +/- ##
===========================================
- Coverage 84.68% 84.25% -0.43%
===========================================
Files 730 735 +5
Lines 113711 113960 +249
===========================================
- Hits 96291 96012 -279
- Misses 17420 17948 +528 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests are too few, especially the cases for kafka. I think it's a must to do it first.
return None; | ||
} | ||
|
||
Some(kafka_ctrl_client.create_topic( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does rskafka
return an error if the topic already exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a TopicAlreadyExists
Error in rskafka, but it's never used except in one test.
See: https://github.com/influxdata/rskafka/blob/a9a16d51eba40dbce7a5d976f2a7094dfc0a7e17/src/protocol/error.rs#L48
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to further determine how rskafka handles an already-exist topic.
a8a0f7e
to
e77f0fa
Compare
Waiting for #2816 to be merged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- For metadata, we need to consider the ability of backward compatibility; it's not easy to make breaking changes in the production environment.
- I think Kafka WAL will be replaced with our own WAL service in the future. The code should be more general, and we can migrate to the new WAL service easily.
#[snafu(display("Failed to deserialize Kafka topics"))] | ||
DeserKafkaTopics { | ||
location: Location, | ||
#[snafu(source)] | ||
error: JsonError, | ||
}, | ||
|
||
#[snafu(display("Failed to serialize Kafka topics"))] | ||
SerKafkaTopics { | ||
location: Location, | ||
#[snafu(source)] | ||
error: JsonError, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using DecodeJson
and EncodeJson
?
.await | ||
.context(CreateKafkaTopicSnafu)?; | ||
|
||
persist_created_topics(&topics, kv_backend).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add some info logs for creating topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do so.
@niebayes There are some comments, are u currently working on addressing them? Let's discuss if you have any problems. |
I hereby agree to the terms of the GreptimeDB CLA
What's changed and what's your intention?
This PR is part of the Kafka remote wal project.
It adds Kafka stuff to meta srv which is now able to allocate Kafka topics for regions while handling create table requests.
Details
There resides a
WalMetaAllocator
inMetaSrvTablemetadataAllocator
. When thecreate
inferface of theMetaSrvTablemetadataAllocator
is invoked, a number of topics will be allocated byWalMetaAllocator
. Those topics will be assigned to regions when executingCreateTableProcedure
by the procedure manager.A region's topic is stored in
RegionOptions
so that it could be used in opening and creating regions by datanode.Checklist
Refer to a related PR or issue link (optional)