Skip to content

Commit

Permalink
feat: query table meta by ids (#2675)
Browse files Browse the repository at this point in the history
* feat: add table meta by id

* feat: add help for http api

* chore: by comment

* feat: display for LeaderChangeMessage
  • Loading branch information
fengjiachun authored Nov 2, 2023
1 parent 4b48c71 commit 479ffe5
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 124 deletions.
34 changes: 34 additions & 0 deletions src/common/meta/src/key/table_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
Expand All @@ -21,6 +23,7 @@ use crate::error::Result;
use crate::key::{to_removed_key, TableMetaKey};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::BatchGetRequest;
use crate::table_name::TableName;

pub struct TableInfoKey {
Expand Down Expand Up @@ -233,6 +236,37 @@ impl TableInfoManager {
.map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value))
.transpose()
}

pub async fn batch_get(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, TableInfoValue>> {
let lookup_table = table_ids
.iter()
.map(|id| (TableInfoKey::new(*id).as_raw_key(), id))
.collect::<HashMap<_, _>>();

let resp = self
.kv_backend
.batch_get(BatchGetRequest {
keys: lookup_table.keys().cloned().collect::<Vec<_>>(),
})
.await?;

let values = resp
.kvs
.iter()
.map(|kv| {
Ok((
// Safety: must exist.
**lookup_table.get(kv.key()).unwrap(),
TableInfoValue::try_from_raw_value(&kv.value)?,
))
})
.collect::<Result<HashMap<_, _>>>()?;

Ok(values)
}
}

#[cfg(test)]
Expand Down
3 changes: 3 additions & 0 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::lock_server::LockServer;
use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
use common_telemetry::info;
use etcd_client::Client;
use servers::configurator::ConfiguratorRef;
use servers::http::{HttpServer, HttpServerBuilder};
Expand Down Expand Up @@ -134,6 +135,8 @@ pub async fn bootstrap_meta_srv_with_router(
.await
.context(error::TcpBindSnafu { addr: bind_addr })?;

info!("gRPC server is bound to: {bind_addr}");

let incoming =
TcpIncoming::from_listener(listener, true, None).context(error::TcpIncomingSnafu)?;

Expand Down
22 changes: 22 additions & 0 deletions src/meta-srv/src/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub mod etcd;

use std::fmt;
use std::sync::Arc;

use etcd_client::LeaderKey;
Expand All @@ -29,6 +30,27 @@ pub enum LeaderChangeMessage {
StepDown(Arc<LeaderKey>),
}

impl fmt::Display for LeaderChangeMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let leader_key = match self {
LeaderChangeMessage::Elected(leader_key) => {
write!(f, "Elected(")?;
leader_key
}
LeaderChangeMessage::StepDown(leader_key) => {
write!(f, "StepDown(")?;
leader_key
}
};
write!(f, "LeaderKey {{ ")?;
write!(f, "name: {}", String::from_utf8_lossy(leader_key.name()))?;
write!(f, ", key: {}", String::from_utf8_lossy(leader_key.key()))?;
write!(f, ", rev: {}", leader_key.rev())?;
write!(f, ", lease: {}", leader_key.lease())?;
write!(f, " }})")
}
}

#[async_trait::async_trait]
pub trait Election: Send + Sync {
type Leader;
Expand Down
5 changes: 1 addition & 4 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,7 @@ impl MetaSrv {
Ok(msg) => {
in_memory.reset();
leader_cached_kv_store.reset();
info!(
"Leader's cache has bean cleared on leader change: {:?}",
msg
);
info!("Leader's cache has bean cleared on leader change: {msg}");
match msg {
LeaderChangeMessage::Elected(_) => {
state_handler.on_become_leader().await;
Expand Down
64 changes: 32 additions & 32 deletions src/meta-srv/src/service/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
},
);

let router = router.route(
"/heartbeat",
heartbeat::HeartBeatHandler {
meta_peer_client: meta_srv.meta_peer_client().clone(),
},
);
let handler = heartbeat::HeartBeatHandler {
meta_peer_client: meta_srv.meta_peer_client().clone(),
};
let router = router
.route("/heartbeat", handler.clone())
.route("/heartbeat/help", handler);

let router = router.route(
"/catalogs",
Expand All @@ -56,26 +56,26 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
},
);

let router = router.route(
"/schemas",
meta::SchemasHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
},
);

let router = router.route(
"/tables",
meta::TablesHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
},
);

let router = router.route(
"/table",
meta::TableHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
},
);
let handler = meta::SchemasHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
};
let router = router
.route("/schemas", handler.clone())
.route("/schemas/help", handler);

let handler = meta::TablesHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
};
let router = router
.route("/tables", handler.clone())
.route("/tables/help", handler);

let handler = meta::TableHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
};
let router = router
.route("/table", handler.clone())
.route("/table/help", handler);

let router = router.route(
"/leader",
Expand All @@ -84,12 +84,12 @@ pub fn make_admin_service(meta_srv: MetaSrv) -> Admin {
},
);

let router = router.route(
"/route",
route::RouteHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
},
);
let handler = route::RouteHandler {
table_metadata_manager: meta_srv.table_metadata_manager().clone(),
};
let router = router
.route("/route", handler.clone())
.route("/route/help", handler);

let router = router.route(
"/inactive-regions/view",
Expand Down
14 changes: 12 additions & 2 deletions src/meta-srv/src/service/admin/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use tonic::codegen::http;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, Result};
use crate::keys::StatValue;
use crate::service::admin::HttpHandler;
use crate::service::admin::{util, HttpHandler};

#[derive(Clone)]
pub struct HeartBeatHandler {
pub meta_peer_client: MetaPeerClientRef,
}
Expand All @@ -31,9 +32,18 @@ pub struct HeartBeatHandler {
impl HttpHandler for HeartBeatHandler {
async fn handle(
&self,
_: &str,
path: &str,
params: &HashMap<String, String>,
) -> Result<http::Response<String>> {
if path.ends_with("/help") {
return util::to_text_response(
r#"
- GET /heartbeat
- GET /heartbeat?addr=127.0.0.1:3001
"#,
);
}

let stat_kvs = self.meta_peer_client.get_all_dn_stat_kvs().await?;
let mut stat_vals: Vec<StatValue> = stat_kvs.into_values().collect();

Expand Down
Loading

0 comments on commit 479ffe5

Please sign in to comment.