Skip to content

Commit

Permalink
feat: add '--store-key-prefix' to support multiple meta instances can…
Browse files Browse the repository at this point in the history
… use the same etcd backend
  • Loading branch information
zyy17 committed Dec 25, 2023
1 parent 06fd7fd commit 5f0d5cf
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 15 deletions.
6 changes: 6 additions & 0 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ struct StartCommand {
/// The working home directory of this metasrv instance.
#[clap(long)]
data_home: Option<String>,

/// If it's not empty, the metasrv will store all data with this key prefix.
#[clap(long)]
store_key_prefix: Option<String>,
}

impl StartCommand {
Expand Down Expand Up @@ -173,6 +177,8 @@ impl StartCommand {
opts.data_home = data_home.clone();
}

opts.store_key_prefix = self.store_key_prefix.clone();

// Disable dashboard in metasrv.
opts.http.disable_dashboard = true;

Expand Down
25 changes: 22 additions & 3 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::lock_server::LockServer;
use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
Expand Down Expand Up @@ -189,10 +190,28 @@ pub async fn metasrv_builder(
),
(None, false) => {
let etcd_client = create_etcd_client(opts).await?;
let kv_backend = {
let etcd_backend = EtcdStore::with_etcd_client(etcd_client.clone());
if let Some(prefix) = opts.store_key_prefix.clone() {
Arc::new(ChrootKvBackend::new(prefix.into_bytes(), etcd_backend))
} else {
etcd_backend
}
};
(
EtcdStore::with_etcd_client(etcd_client.clone()),
Some(EtcdElection::with_etcd_client(&opts.server_addr, etcd_client.clone()).await?),
Some(EtcdLock::with_etcd_client(etcd_client)?),
kv_backend,
Some(
EtcdElection::with_etcd_client(
&opts.server_addr,
etcd_client.clone(),
opts.store_key_prefix.clone(),
)
.await?,
),
Some(EtcdLock::with_etcd_client(
etcd_client,
opts.store_key_prefix.clone(),
)?),
)
}
};
Expand Down
27 changes: 22 additions & 5 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ pub struct EtcdElection {
is_leader: AtomicBool,
infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: Option<String>,
}

impl EtcdElection {
pub async fn with_endpoints<E, S>(leader_value: E, endpoints: S) -> Result<ElectionRef>
pub async fn with_endpoints<E, S>(
leader_value: E,
endpoints: S,
store_key_prefix: Option<String>,
) -> Result<ElectionRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
Expand All @@ -47,10 +52,14 @@ impl EtcdElection {
.await
.context(error::ConnectEtcdSnafu)?;

Self::with_etcd_client(leader_value, client).await
Self::with_etcd_client(leader_value, client, store_key_prefix).await
}

pub async fn with_etcd_client<E>(leader_value: E, client: Client) -> Result<ElectionRef>
pub async fn with_etcd_client<E>(
leader_value: E,
client: Client,
store_key_prefix: Option<String>,
) -> Result<ElectionRef>
where
E: AsRef<str>,
{
Expand Down Expand Up @@ -91,8 +100,16 @@ impl EtcdElection {
is_leader: AtomicBool::new(false),
infancy: AtomicBool::new(false),
leader_watcher: tx,
store_key_prefix,
}))
}

fn election_key(&self) -> String {
match &self.store_key_prefix {
Some(prefix) => format!("{}{}", prefix, ELECTION_KEY),
None => ELECTION_KEY.to_string(),
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -128,7 +145,7 @@ impl Election for EtcdElection {
// to confirm that it is a valid leader, because it is possible that the
// election's lease expires.
let res = election_client
.campaign(ELECTION_KEY, self.leader_value.clone(), lease_id)
.campaign(self.election_key(), self.leader_value.clone(), lease_id)
.await
.context(error::EtcdFailedSnafu)?;

Expand Down Expand Up @@ -188,7 +205,7 @@ impl Election for EtcdElection {
let res = self
.client
.election_client()
.leader(ELECTION_KEY)
.leader(self.election_key())
.await
.context(error::EtcdFailedSnafu)?;
let leader_value = res.kv().context(error::NoLeaderSnafu)?.value();
Expand Down
34 changes: 27 additions & 7 deletions src/meta-srv/src/lock/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ use crate::error::Result;
#[derive(Clone)]
pub struct EtcdLock {
client: Client,
store_key_prefix: Option<String>,
}

impl EtcdLock {
pub async fn with_endpoints<E, S>(endpoints: S) -> Result<DistLockRef>
pub async fn with_endpoints<E, S>(
endpoints: S,
store_key_prefix: Option<String>,
) -> Result<DistLockRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
Expand All @@ -37,17 +41,30 @@ impl EtcdLock {
.await
.context(error::ConnectEtcdSnafu)?;

Self::with_etcd_client(client)
Self::with_etcd_client(client, store_key_prefix)
}

pub fn with_etcd_client(client: Client) -> Result<DistLockRef> {
Ok(Arc::new(EtcdLock { client }))
pub fn with_etcd_client(
client: Client,
store_key_prefix: Option<String>,
) -> Result<DistLockRef> {
Ok(Arc::new(EtcdLock {
client,
store_key_prefix,
}))
}

fn lock_key(&self, key: Vec<u8>) -> Vec<u8> {
match &self.store_key_prefix {
Some(prefix) => format!("{}{}", prefix, String::from_utf8_lossy(&key)).into_bytes(),
None => key,
}
}
}

#[async_trait::async_trait]
impl DistLock for EtcdLock {
async fn lock(&self, name: Vec<u8>, opts: Opts) -> Result<Vec<u8>> {
async fn lock(&self, key: Vec<u8>, opts: Opts) -> Result<Vec<u8>> {
let expire = opts.expire_secs.unwrap_or(DEFAULT_EXPIRE_TIME_SECS) as i64;

let mut client = self.client.clone();
Expand All @@ -61,7 +78,7 @@ impl DistLock for EtcdLock {
let lock_opts = LockOptions::new().with_lease(lease_id);

let resp = client
.lock(name, Some(lock_opts))
.lock(self.lock_key(key), Some(lock_opts))
.await
.context(error::LockSnafu)?;

Expand All @@ -70,7 +87,10 @@ impl DistLock for EtcdLock {

async fn unlock(&self, key: Vec<u8>) -> Result<()> {
let mut client = self.client.clone();
let _ = client.unlock(key).await.context(error::UnlockSnafu)?;
let _ = client
.unlock(self.lock_key(key))
.await
.context(error::UnlockSnafu)?;
Ok(())
}
}
2 changes: 2 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct MetaSrvOptions {
pub data_home: String,
pub wal: WalConfig,
pub export_metrics: ExportMetricsOption,
pub store_key_prefix: Option<String>,
}

impl Default for MetaSrvOptions {
Expand All @@ -101,6 +102,7 @@ impl Default for MetaSrvOptions {
data_home: METASRV_HOME.to_string(),
wal: WalConfig::default(),
export_metrics: ExportMetricsOption::default(),
store_key_prefix: None,
}
}
}
Expand Down

0 comments on commit 5f0d5cf

Please sign in to comment.