Skip to content

Commit

Permalink
refactor: remove MetadataService
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 12, 2023
1 parent c5407b5 commit f82737b
Show file tree
Hide file tree
Showing 12 changed files with 21 additions and 192 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ etcd-client.workspace = true
futures.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
metrics.workspace = true
prost.workspace = true
regex.workspace = true
serde.workspace = true
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/key/catalog_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use futures::stream::BoxStream;
use futures::StreamExt;
use metrics::increment_counter;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -108,6 +109,7 @@ impl CatalogManager {
.with_key(raw_key)
.with_value(CatalogNameValue.try_as_raw_value()?);
self.kv_backend.put(req).await?;
increment_counter!(crate::metrics::METRIC_META_CREATE_CATALOG);

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/key/schema_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use futures::stream::BoxStream;
use futures::StreamExt;
use humantime_serde::re::humantime;
use metrics::increment_counter;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

Expand Down Expand Up @@ -149,6 +150,7 @@ impl SchemaManager {
.with_value(value.unwrap_or_default().try_as_raw_value()?);

self.kv_backend.put(req).await?;
increment_counter!(crate::metrics::METRIC_META_CREATE_SCHEMA);

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

pub const METRIC_META_TXN_REQUEST: &str = "meta.txn_request";

pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog";
pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema";
pub(crate) const METRIC_META_PROCEDURE_CREATE_TABLE: &str = "meta.procedure.create_table";
pub(crate) const METRIC_META_PROCEDURE_DROP_TABLE: &str = "meta.procedure.drop_table";
pub(crate) const METRIC_META_PROCEDURE_ALTER_TABLE: &str = "meta.procedure.alter_table";
8 changes: 8 additions & 0 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ use crate::pubsub::Message;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to create default catalog and schema, source: {}", source))]
InitMetadata {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to allocate next sequence number: {}", source))]
NextSequence {
location: Location,
Expand Down Expand Up @@ -612,6 +618,8 @@ impl ErrorExt for Error {
| Error::ConvertEtcdTxnObject { source, .. }
| Error::GetFullTableInfo { source, .. } => source.status_code(),

Error::InitMetadata { source, .. } => source.status_code(),

Error::Other { source, .. } => source.status_code(),
}
}
Expand Down
1 change: 0 additions & 1 deletion src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub mod handler;
pub mod keys;
pub mod lease;
pub mod lock;
pub mod metadata_service;
pub mod metasrv;
mod metrics;
#[cfg(feature = "mock")]
Expand Down
157 changes: 0 additions & 157 deletions src/meta-srv/src/metadata_service.rs

This file was deleted.

10 changes: 4 additions & 6 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::Arc;
use std::time::Duration;

use api::v1::meta::Peer;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager;
use common_meta::ddl::DdlTaskExecutorRef;
Expand All @@ -36,10 +35,9 @@ use tokio::sync::broadcast::error::RecvError;

use crate::cluster::MetaPeerClientRef;
use crate::election::{Election, LeaderChangeMessage};
use crate::error::{RecoverProcedureSnafu, Result};
use crate::error::{InitMetadataSnafu, RecoverProcedureSnafu, Result};
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::metadata_service::MetadataServiceRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
use crate::selector::{Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
Expand Down Expand Up @@ -196,7 +194,6 @@ pub struct MetaSrv {
election: Option<ElectionRef>,
lock: DistLockRef,
procedure_manager: ProcedureManagerRef,
metadata_service: MetadataServiceRef,
mailbox: MailboxRef,
ddl_executor: DdlTaskExecutorRef,
table_metadata_manager: TableMetadataManagerRef,
Expand Down Expand Up @@ -296,9 +293,10 @@ impl MetaSrv {
}

async fn create_default_schema_if_not_exist(&self) -> Result<()> {
self.metadata_service
.create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true)
self.table_metadata_manager
.init()
.await
.context(InitMetadataSnafu)
}

pub fn shutdown(&self) {
Expand Down
19 changes: 0 additions & 19 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use crate::handler::response_header_handler::ResponseHeaderHandler;
use crate::handler::{HeartbeatHandlerGroup, HeartbeatMailbox, Pushers};
use crate::lock::memory::MemLock;
use crate::lock::DistLockRef;
use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef};
use crate::metasrv::{
ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
Expand All @@ -66,7 +65,6 @@ pub struct MetaSrvBuilder {
election: Option<ElectionRef>,
meta_peer_client: Option<MetaPeerClientRef>,
lock: Option<DistLockRef>,
metadata_service: Option<MetadataServiceRef>,
datanode_clients: Option<Arc<DatanodeClients>>,
pubsub: Option<(PublishRef, SubscribeManagerRef)>,
}
Expand All @@ -82,7 +80,6 @@ impl MetaSrvBuilder {
election: None,
options: None,
lock: None,
metadata_service: None,
datanode_clients: None,
pubsub: None,
}
Expand Down Expand Up @@ -128,11 +125,6 @@ impl MetaSrvBuilder {
self
}

pub fn metadata_service(mut self, metadata_service: MetadataServiceRef) -> Self {
self.metadata_service = Some(metadata_service);
self
}

pub fn datanode_clients(mut self, clients: Arc<DatanodeClients>) -> Self {
self.datanode_clients = Some(clients);
self
Expand All @@ -155,7 +147,6 @@ impl MetaSrvBuilder {
selector,
handler_group,
lock,
metadata_service,
datanode_clients,
pubsub,
} = self;
Expand All @@ -174,10 +165,7 @@ impl MetaSrvBuilder {
let kv_backend = KvBackendAdapter::wrap(kv_store.clone());
let table_id_sequence = Arc::new(Sequence::new(TABLE_ID_SEQ, 1024, 10, kv_backend.clone()));
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let metadata_service = metadata_service
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(table_metadata_manager)));
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));
let table_metadata_manager = build_table_metadata_manager(&kv_store);
let ctx = SelectorContext {
datanode_lease_secs: options.datanode_lease_secs,
server_addr: options.server_addr.clone(),
Expand Down Expand Up @@ -275,7 +263,6 @@ impl MetaSrvBuilder {
election,
lock,
procedure_manager,
metadata_service,
mailbox,
ddl_executor: ddl_manager,
table_metadata_manager,
Expand Down Expand Up @@ -333,12 +320,6 @@ fn build_procedure_manager(options: &MetaSrvOptions, kv_store: &KvStoreRef) -> P
Arc::new(LocalManager::new(manager_config, state_store))
}

fn build_table_metadata_manager(kv_store: &KvStoreRef) -> TableMetadataManagerRef {
Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
)))
}

fn build_ddl_manager(
options: &MetaSrvOptions,
datanode_clients: Option<Arc<DatanodeClients>>,
Expand Down
2 changes: 0 additions & 2 deletions src/meta-srv/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub(crate) const METRIC_META_CREATE_CATALOG: &str = "meta.create_catalog";
pub(crate) const METRIC_META_CREATE_SCHEMA: &str = "meta.create_schema";
pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request";
pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request";
pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num";
Expand Down
8 changes: 1 addition & 7 deletions src/meta-srv/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
use client::client_manager::DatanodeClients;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::key::TableMetadataManager;
use tower::service_fn;

use crate::metadata_service::{DefaultMetadataService, MetadataService};
use crate::metasrv::builder::MetaSrvBuilder;
use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use crate::service::store::etcd::EtcdStore;
Expand Down Expand Up @@ -64,12 +62,8 @@ pub async fn mock(
let table_metadata_manager = Arc::new(TableMetadataManager::new(KvBackendAdapter::wrap(
kv_store.clone(),
)));
let metadata_service = DefaultMetadataService::new(table_metadata_manager);

metadata_service
.create_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, true)
.await
.unwrap();
table_metadata_manager.init().await.unwrap();

let builder = MetaSrvBuilder::new().options(opts).kv_store(kv_store);

Expand Down

0 comments on commit f82737b

Please sign in to comment.