Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add '--store-key-prefix' to support multiple meta instances can use the same etcd backend #2988

Merged
merged 2 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ struct StartCommand {
/// The working home directory of this metasrv instance.
#[clap(long)]
data_home: Option<String>,

/// If it's not empty, the metasrv will store all data with this key prefix.
#[clap(long)]
store_key_prefix: Option<String>,
}

impl StartCommand {
Expand Down Expand Up @@ -173,6 +177,8 @@ impl StartCommand {
opts.data_home = data_home.clone();
}

opts.store_key_prefix = self.store_key_prefix.clone();

// Disable dashboard in metasrv.
opts.http.disable_dashboard = true;

Expand Down
25 changes: 22 additions & 3 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::lock_server::LockServer;
use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
Expand Down Expand Up @@ -189,10 +190,28 @@ pub async fn metasrv_builder(
),
(None, false) => {
let etcd_client = create_etcd_client(opts).await?;
let kv_backend = {
let etcd_backend = EtcdStore::with_etcd_client(etcd_client.clone());
if let Some(prefix) = opts.store_key_prefix.clone() {
Arc::new(ChrootKvBackend::new(prefix.into_bytes(), etcd_backend))
} else {
etcd_backend
}
};
(
EtcdStore::with_etcd_client(etcd_client.clone()),
Some(EtcdElection::with_etcd_client(&opts.server_addr, etcd_client.clone()).await?),
Some(EtcdLock::with_etcd_client(etcd_client)?),
kv_backend,
Some(
EtcdElection::with_etcd_client(
&opts.server_addr,
etcd_client.clone(),
opts.store_key_prefix.clone(),
)
.await?,
),
Some(EtcdLock::with_etcd_client(
etcd_client,
opts.store_key_prefix.clone(),
)?),
)
}
};
Expand Down
27 changes: 22 additions & 5 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ pub struct EtcdElection {
is_leader: AtomicBool,
infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: Option<String>,
}

impl EtcdElection {
pub async fn with_endpoints<E, S>(leader_value: E, endpoints: S) -> Result<ElectionRef>
pub async fn with_endpoints<E, S>(
leader_value: E,
endpoints: S,
store_key_prefix: Option<String>,
) -> Result<ElectionRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
Expand All @@ -47,10 +52,14 @@ impl EtcdElection {
.await
.context(error::ConnectEtcdSnafu)?;

Self::with_etcd_client(leader_value, client).await
Self::with_etcd_client(leader_value, client, store_key_prefix).await
}

pub async fn with_etcd_client<E>(leader_value: E, client: Client) -> Result<ElectionRef>
pub async fn with_etcd_client<E>(
leader_value: E,
client: Client,
store_key_prefix: Option<String>,
) -> Result<ElectionRef>
where
E: AsRef<str>,
{
Expand Down Expand Up @@ -91,8 +100,16 @@ impl EtcdElection {
is_leader: AtomicBool::new(false),
infancy: AtomicBool::new(false),
leader_watcher: tx,
store_key_prefix,
}))
}

fn election_key(&self) -> String {
match &self.store_key_prefix {
Some(prefix) => format!("{}{}", prefix, ELECTION_KEY),
None => ELECTION_KEY.to_string(),
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -128,7 +145,7 @@ impl Election for EtcdElection {
// to confirm that it is a valid leader, because it is possible that the
// election's lease expires.
let res = election_client
.campaign(ELECTION_KEY, self.leader_value.clone(), lease_id)
.campaign(self.election_key(), self.leader_value.clone(), lease_id)
.await
.context(error::EtcdFailedSnafu)?;

Expand Down Expand Up @@ -188,7 +205,7 @@ impl Election for EtcdElection {
let res = self
.client
.election_client()
.leader(ELECTION_KEY)
.leader(self.election_key())
.await
.context(error::EtcdFailedSnafu)?;
let leader_value = res.kv().context(error::NoLeaderSnafu)?.value();
Expand Down
38 changes: 31 additions & 7 deletions src/meta-srv/src/lock/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ use crate::error::Result;
#[derive(Clone)]
pub struct EtcdLock {
client: Client,
store_key_prefix: Option<String>,
}

impl EtcdLock {
pub async fn with_endpoints<E, S>(endpoints: S) -> Result<DistLockRef>
pub async fn with_endpoints<E, S>(
endpoints: S,
store_key_prefix: Option<String>,
) -> Result<DistLockRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
Expand All @@ -37,17 +41,34 @@ impl EtcdLock {
.await
.context(error::ConnectEtcdSnafu)?;

Self::with_etcd_client(client)
Self::with_etcd_client(client, store_key_prefix)
}

pub fn with_etcd_client(client: Client) -> Result<DistLockRef> {
Ok(Arc::new(EtcdLock { client }))
pub fn with_etcd_client(
client: Client,
store_key_prefix: Option<String>,
) -> Result<DistLockRef> {
Ok(Arc::new(EtcdLock {
client,
store_key_prefix,
}))
}

fn lock_key(&self, key: Vec<u8>) -> Vec<u8> {
match &self.store_key_prefix {
Some(prefix) => {
let mut prefix = prefix.as_bytes().to_vec();
prefix.extend_from_slice(&key);
prefix
}
None => key,
}
}
}

#[async_trait::async_trait]
impl DistLock for EtcdLock {
async fn lock(&self, name: Vec<u8>, opts: Opts) -> Result<Vec<u8>> {
async fn lock(&self, key: Vec<u8>, opts: Opts) -> Result<Vec<u8>> {
let expire = opts.expire_secs.unwrap_or(DEFAULT_EXPIRE_TIME_SECS) as i64;

let mut client = self.client.clone();
Expand All @@ -61,7 +82,7 @@ impl DistLock for EtcdLock {
let lock_opts = LockOptions::new().with_lease(lease_id);

let resp = client
.lock(name, Some(lock_opts))
.lock(self.lock_key(key), Some(lock_opts))
.await
.context(error::LockSnafu)?;

Expand All @@ -70,7 +91,10 @@ impl DistLock for EtcdLock {

async fn unlock(&self, key: Vec<u8>) -> Result<()> {
let mut client = self.client.clone();
let _ = client.unlock(key).await.context(error::UnlockSnafu)?;
let _ = client
.unlock(self.lock_key(key))
.await
.context(error::UnlockSnafu)?;
Ok(())
}
}
2 changes: 2 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct MetaSrvOptions {
pub data_home: String,
pub wal: WalConfig,
pub export_metrics: ExportMetricsOption,
pub store_key_prefix: Option<String>,
}

impl Default for MetaSrvOptions {
Expand All @@ -101,6 +102,7 @@ impl Default for MetaSrvOptions {
data_home: METASRV_HOME.to_string(),
wal: WalConfig::default(),
export_metrics: ExportMetricsOption::default(),
store_key_prefix: None,
}
}
}
Expand Down
Loading