Skip to content

Commit

Permalink
feat: add '--store-key-prefix' to support multiple meta instances can…
Browse files Browse the repository at this point in the history
… use the same etcd backend
  • Loading branch information
zyy17 committed Dec 25, 2023
1 parent 06fd7fd commit a7741aa
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 7 deletions.
6 changes: 6 additions & 0 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ struct StartCommand {
/// The working home directory of this metasrv instance.
#[clap(long)]
data_home: Option<String>,

/// If it's not empty, the metasrv will store all data with this key prefix.
#[clap(long)]
store_key_prefix: Option<String>,
}

impl StartCommand {
Expand Down Expand Up @@ -173,6 +177,8 @@ impl StartCommand {
opts.data_home = data_home.clone();
}

opts.store_key_prefix = self.store_key_prefix.clone();

// Disable dashboard in metasrv.
opts.http.disable_dashboard = true;

Expand Down
20 changes: 18 additions & 2 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::lock_server::LockServer;
use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
Expand Down Expand Up @@ -189,9 +190,24 @@ pub async fn metasrv_builder(
),
(None, false) => {
let etcd_client = create_etcd_client(opts).await?;
let kv_backend = {
let etcd_backend = EtcdStore::with_etcd_client(etcd_client.clone());
if let Some(prefix) = opts.store_key_prefix.clone() {
Arc::new(ChrootKvBackend::new(prefix.into_bytes(), etcd_backend))
} else {
etcd_backend
}
};
(
EtcdStore::with_etcd_client(etcd_client.clone()),
Some(EtcdElection::with_etcd_client(&opts.server_addr, etcd_client.clone()).await?),
kv_backend,
Some(
EtcdElection::with_etcd_client(
&opts.server_addr,
etcd_client.clone(),
opts.store_key_prefix.clone(),
)
.await?,
),
Some(EtcdLock::with_etcd_client(etcd_client)?),
)
}
Expand Down
27 changes: 22 additions & 5 deletions src/meta-srv/src/election/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@ pub struct EtcdElection {
is_leader: AtomicBool,
infancy: AtomicBool,
leader_watcher: broadcast::Sender<LeaderChangeMessage>,
store_key_prefix: Option<String>,
}

impl EtcdElection {
pub async fn with_endpoints<E, S>(leader_value: E, endpoints: S) -> Result<ElectionRef>
pub async fn with_endpoints<E, S>(
leader_value: E,
endpoints: S,
store_key_prefix: Option<String>,
) -> Result<ElectionRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
Expand All @@ -47,10 +52,14 @@ impl EtcdElection {
.await
.context(error::ConnectEtcdSnafu)?;

Self::with_etcd_client(leader_value, client).await
Self::with_etcd_client(leader_value, client, store_key_prefix).await
}

pub async fn with_etcd_client<E>(leader_value: E, client: Client) -> Result<ElectionRef>
pub async fn with_etcd_client<E>(
leader_value: E,
client: Client,
store_key_prefix: Option<String>,
) -> Result<ElectionRef>
where
E: AsRef<str>,
{
Expand Down Expand Up @@ -91,8 +100,16 @@ impl EtcdElection {
is_leader: AtomicBool::new(false),
infancy: AtomicBool::new(false),
leader_watcher: tx,
store_key_prefix,
}))
}

fn election_key(&self) -> String {
match &self.store_key_prefix {
Some(prefix) => format!("{}{}", prefix, ELECTION_KEY),
None => ELECTION_KEY.to_string(),
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -128,7 +145,7 @@ impl Election for EtcdElection {
// to confirm that it is a valid leader, because it is possible that the
// election's lease expires.
let res = election_client
.campaign(ELECTION_KEY, self.leader_value.clone(), lease_id)
.campaign(self.election_key(), self.leader_value.clone(), lease_id)
.await
.context(error::EtcdFailedSnafu)?;

Expand Down Expand Up @@ -188,7 +205,7 @@ impl Election for EtcdElection {
let res = self
.client
.election_client()
.leader(ELECTION_KEY)
.leader(self.election_key())
.await
.context(error::EtcdFailedSnafu)?;
let leader_value = res.kv().context(error::NoLeaderSnafu)?.value();
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct MetaSrvOptions {
pub data_home: String,
pub wal: WalConfig,
pub export_metrics: ExportMetricsOption,
pub store_key_prefix: Option<String>,
}

impl Default for MetaSrvOptions {
Expand All @@ -101,6 +102,7 @@ impl Default for MetaSrvOptions {
data_home: METASRV_HOME.to_string(),
wal: WalConfig::default(),
export_metrics: ExportMetricsOption::default(),
store_key_prefix: None,
}
}
}
Expand Down

0 comments on commit a7741aa

Please sign in to comment.