Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 3, 2023
1 parent 2144786 commit 74f9bb0
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl MetaSrvBuilder {

let state = Arc::new(RwLock::new(match election {
None => State::leader(options.server_addr.to_string(), true),
Some(_) => State::new(options.server_addr.to_string()),
Some(_) => State::follower(options.server_addr.to_string()),
}));

let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
Expand Down
23 changes: 18 additions & 5 deletions src/meta-srv/src/service/store/cached_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ use common_meta::kv_backend::txn::{Txn, TxnOp, TxnRequest, TxnResponse};
use common_meta::kv_backend::{
KvBackend, KvBackendRef, ResettableKvBackend, ResettableKvBackendRef, TxnService,
};
use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use futures::TryStreamExt;

use crate::metrics;
use crate::state::State;
Expand Down Expand Up @@ -94,13 +96,24 @@ impl LeaderCachedKvBackend {
for prefix in &CACHE_KEY_PREFIXES[..] {
let _timer = metrics::METRIC_META_LEADER_CACHED_KV_LOAD.with_label_values(&[prefix]);

let result = self
.store
.range(RangeRequest::new().with_prefix(prefix.as_bytes()))
.await?;
// TODO(weny): Refactors PaginationStream's ouput to unary output.
let stream = PaginationStream::new(
self.store.clone(),
RangeRequest::new().with_prefix(prefix.as_bytes()),
DEFAULT_PAGE_SIZE,
Arc::new(|kv| Ok((kv, ()))),
);

let kvs = stream
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|(kv, _)| kv)
.collect();

self.cache
.batch_put(BatchPutRequest {
kvs: result.kvs,
kvs,
prev_kv: false,
})
.await?;
Expand Down
4 changes: 2 additions & 2 deletions src/meta-srv/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub struct FollowerState {
}

impl State {
pub fn new(server_addr: String) -> State {
pub fn follower(server_addr: String) -> State {
Self::Follower(FollowerState { server_addr })
}

Expand Down Expand Up @@ -117,7 +117,7 @@ mod tests {

#[tokio::test]
async fn test_next_state() {
let mut state = State::new("test".to_string());
let mut state = State::follower("test".to_string());

state.next_state(become_leader(false));

Expand Down

0 comments on commit 74f9bb0

Please sign in to comment.