From 5f0d5cf83ce17239fdd757df2a0a9e0423df4cd6 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Wed, 13 Dec 2023 19:41:46 +0800 Subject: [PATCH 1/2] feat: add '--store-key-prefix' to support multiple meta instances can use the same etcd backend --- src/cmd/src/metasrv.rs | 6 ++++++ src/meta-srv/src/bootstrap.rs | 25 ++++++++++++++++++++--- src/meta-srv/src/election/etcd.rs | 27 +++++++++++++++++++----- src/meta-srv/src/lock/etcd.rs | 34 ++++++++++++++++++++++++------- src/meta-srv/src/metasrv.rs | 2 ++ 5 files changed, 79 insertions(+), 15 deletions(-) diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index ba41649a8e9c..6653d2431c13 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -117,6 +117,10 @@ struct StartCommand { /// The working home directory of this metasrv instance. #[clap(long)] data_home: Option, + + /// If it's not empty, the metasrv will store all data with this key prefix. + #[clap(long)] + store_key_prefix: Option, } impl StartCommand { @@ -173,6 +177,8 @@ impl StartCommand { opts.data_home = data_home.clone(); } + opts.store_key_prefix = self.store_key_prefix.clone(); + // Disable dashboard in metasrv. opts.http.disable_dashboard = true; diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 5e5361bf6a76..196ded82b9b5 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -20,6 +20,7 @@ use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::lock_server::LockServer; use api::v1::meta::store_server::StoreServer; use common_base::Plugins; +use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; @@ -189,10 +190,28 @@ pub async fn metasrv_builder( ), (None, false) => { let etcd_client = create_etcd_client(opts).await?; + let kv_backend = { + let etcd_backend = EtcdStore::with_etcd_client(etcd_client.clone()); + if let Some(prefix) = opts.store_key_prefix.clone() { + Arc::new(ChrootKvBackend::new(prefix.into_bytes(), etcd_backend)) + } else { + etcd_backend + } + }; ( - EtcdStore::with_etcd_client(etcd_client.clone()), - Some(EtcdElection::with_etcd_client(&opts.server_addr, etcd_client.clone()).await?), - Some(EtcdLock::with_etcd_client(etcd_client)?), + kv_backend, + Some( + EtcdElection::with_etcd_client( + &opts.server_addr, + etcd_client.clone(), + opts.store_key_prefix.clone(), + ) + .await?, + ), + Some(EtcdLock::with_etcd_client( + etcd_client, + opts.store_key_prefix.clone(), + )?), ) } }; diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 30dcc5518d5b..992be6d61a53 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -35,10 +35,15 @@ pub struct EtcdElection { is_leader: AtomicBool, infancy: AtomicBool, leader_watcher: broadcast::Sender, + store_key_prefix: Option, } impl EtcdElection { - pub async fn with_endpoints(leader_value: E, endpoints: S) -> Result + pub async fn with_endpoints( + leader_value: E, + endpoints: S, + store_key_prefix: Option, + ) -> Result where E: AsRef, S: AsRef<[E]>, @@ -47,10 +52,14 @@ impl EtcdElection { .await .context(error::ConnectEtcdSnafu)?; - Self::with_etcd_client(leader_value, client).await + Self::with_etcd_client(leader_value, client, store_key_prefix).await } - pub async fn with_etcd_client(leader_value: E, client: Client) -> Result + pub async fn with_etcd_client( + leader_value: E, + client: Client, + store_key_prefix: Option, + ) -> Result where E: AsRef, { @@ -91,8 +100,16 @@ impl EtcdElection { is_leader: AtomicBool::new(false), infancy: AtomicBool::new(false), leader_watcher: tx, + store_key_prefix, })) } + + fn election_key(&self) -> String { + match &self.store_key_prefix { + Some(prefix) => format!("{}{}", prefix, ELECTION_KEY), + None => ELECTION_KEY.to_string(), + } + } } #[async_trait::async_trait] @@ -128,7 +145,7 @@ impl Election for EtcdElection { // to confirm that it is a valid leader, because it is possible that the // election's lease expires. let res = election_client - .campaign(ELECTION_KEY, self.leader_value.clone(), lease_id) + .campaign(self.election_key(), self.leader_value.clone(), lease_id) .await .context(error::EtcdFailedSnafu)?; @@ -188,7 +205,7 @@ impl Election for EtcdElection { let res = self .client .election_client() - .leader(ELECTION_KEY) + .leader(self.election_key()) .await .context(error::EtcdFailedSnafu)?; let leader_value = res.kv().context(error::NoLeaderSnafu)?.value(); diff --git a/src/meta-srv/src/lock/etcd.rs b/src/meta-srv/src/lock/etcd.rs index f02a8a974428..f7d403f37949 100644 --- a/src/meta-srv/src/lock/etcd.rs +++ b/src/meta-srv/src/lock/etcd.rs @@ -25,10 +25,14 @@ use crate::error::Result; #[derive(Clone)] pub struct EtcdLock { client: Client, + store_key_prefix: Option, } impl EtcdLock { - pub async fn with_endpoints(endpoints: S) -> Result + pub async fn with_endpoints( + endpoints: S, + store_key_prefix: Option, + ) -> Result where E: AsRef, S: AsRef<[E]>, @@ -37,17 +41,30 @@ impl EtcdLock { .await .context(error::ConnectEtcdSnafu)?; - Self::with_etcd_client(client) + Self::with_etcd_client(client, store_key_prefix) } - pub fn with_etcd_client(client: Client) -> Result { - Ok(Arc::new(EtcdLock { client })) + pub fn with_etcd_client( + client: Client, + store_key_prefix: Option, + ) -> Result { + Ok(Arc::new(EtcdLock { + client, + store_key_prefix, + })) + } + + fn lock_key(&self, key: Vec) -> Vec { + match &self.store_key_prefix { + Some(prefix) => format!("{}{}", prefix, String::from_utf8_lossy(&key)).into_bytes(), + None => key, + } } } #[async_trait::async_trait] impl DistLock for EtcdLock { - async fn lock(&self, name: Vec, opts: Opts) -> Result> { + async fn lock(&self, key: Vec, opts: Opts) -> Result> { let expire = opts.expire_secs.unwrap_or(DEFAULT_EXPIRE_TIME_SECS) as i64; let mut client = self.client.clone(); @@ -61,7 +78,7 @@ impl DistLock for EtcdLock { let lock_opts = LockOptions::new().with_lease(lease_id); let resp = client - .lock(name, Some(lock_opts)) + .lock(self.lock_key(key), Some(lock_opts)) .await .context(error::LockSnafu)?; @@ -70,7 +87,10 @@ impl DistLock for EtcdLock { async fn unlock(&self, key: Vec) -> Result<()> { let mut client = self.client.clone(); - let _ = client.unlock(key).await.context(error::UnlockSnafu)?; + let _ = client + .unlock(self.lock_key(key)) + .await + .context(error::UnlockSnafu)?; Ok(()) } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index faa192ece95e..613d282449c9 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -75,6 +75,7 @@ pub struct MetaSrvOptions { pub data_home: String, pub wal: WalConfig, pub export_metrics: ExportMetricsOption, + pub store_key_prefix: Option, } impl Default for MetaSrvOptions { @@ -101,6 +102,7 @@ impl Default for MetaSrvOptions { data_home: METASRV_HOME.to_string(), wal: WalConfig::default(), export_metrics: ExportMetricsOption::default(), + store_key_prefix: None, } } } From 60f55d840b9f41d35488304c5d553fcecd0e1386 Mon Sep 17 00:00:00 2001 From: zyy17 Date: Mon, 25 Dec 2023 17:22:26 +0800 Subject: [PATCH 2/2] refactor: refine lock_key --- src/meta-srv/src/lock/etcd.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/meta-srv/src/lock/etcd.rs b/src/meta-srv/src/lock/etcd.rs index f7d403f37949..14f1e9c615ff 100644 --- a/src/meta-srv/src/lock/etcd.rs +++ b/src/meta-srv/src/lock/etcd.rs @@ -56,7 +56,11 @@ impl EtcdLock { fn lock_key(&self, key: Vec) -> Vec { match &self.store_key_prefix { - Some(prefix) => format!("{}{}", prefix, String::from_utf8_lossy(&key)).into_bytes(), + Some(prefix) => { + let mut prefix = prefix.as_bytes().to_vec(); + prefix.extend_from_slice(&key); + prefix + } None => key, } }