Skip to content

Commit

Permalink
Remove SEND_PROPOSAL_ACTIVE_REQUESTS and SEND_MESSAGE_ACTIVE_REQUESTS
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Oct 12, 2023
1 parent 61c3eac commit bfa56a3
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 26 deletions.
2 changes: 0 additions & 2 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ pub struct Status {
pub uncommitteds: usize,
pub merger_proposals: usize,
pub sending_raft_messages: isize,
pub active_send_proposal_grpc_requests: isize,
pub active_send_message_grpc_requests: isize,
pub peers: HashMap<u64, String>,
}

Expand Down
3 changes: 0 additions & 3 deletions src/raft_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use tonic::Request;
use crate::error::{Error, Result};
use crate::message::{Merger, Message, Proposals, RaftResponse, ReplyChan, Status};
use crate::raft::Store;
use crate::raft_server::{send_message_active_requests, send_proposal_active_requests};
use crate::raft_service::raft_service_client::RaftServiceClient;
use crate::raft_service::{connect, Message as RraftMessage, Proposal as RraftProposal, Query};
use crate::storage::{LogStore, MemStorage};
Expand Down Expand Up @@ -482,8 +481,6 @@ impl<S: Store + 'static> RaftNode<S> {
uncommitteds: self.uncommitteds.len(),
merger_proposals,
sending_raft_messages,
active_send_proposal_grpc_requests: send_proposal_active_requests(),
active_send_message_grpc_requests: send_message_active_requests(),
peers: self.peer_addrs(),
}
}
Expand Down
21 changes: 0 additions & 21 deletions src/raft_server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::net::SocketAddr;
use std::sync::atomic::{AtomicIsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use bincode::serialize;
use futures::channel::{mpsc, oneshot};
use futures::SinkExt;
use log::{info, warn};
use once_cell::sync::Lazy;
use prost::Message as _;
use tikv_raft::eraftpb::{ConfChange, Message as RaftMessage};
use tokio::time::timeout;
Expand Down Expand Up @@ -140,7 +138,6 @@ impl RaftService for RaftServer {
) -> Result<Response<raft_service::RaftResponse>, Status> {
let message = RaftMessage::decode(request.into_inner().inner.as_ref())
.map_err(|e| Status::invalid_argument(e.to_string()))?;
SEND_MESSAGE_ACTIVE_REQUESTS.fetch_add(1, Ordering::SeqCst);
let reply = match self.snd.clone().try_send(Message::Raft(Box::new(message))) {
Ok(()) => {
let response = RaftResponse::Ok;
Expand All @@ -150,15 +147,13 @@ impl RaftService for RaftServer {
}
Err(_) => Err(Status::unavailable("error for try send message")),
};
SEND_MESSAGE_ACTIVE_REQUESTS.fetch_sub(1, Ordering::SeqCst);
reply
}

async fn send_proposal(
&self,
req: Request<raft_service::Proposal>,
) -> Result<Response<raft_service::RaftResponse>, Status> {
SEND_PROPOSAL_ACTIVE_REQUESTS.fetch_add(1, Ordering::SeqCst);
let proposal = req.into_inner().inner;
let mut sender = self.snd.clone();
let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -187,8 +182,6 @@ impl RaftService for RaftServer {
Err(Status::unavailable("error for try send message"))
}
};

SEND_PROPOSAL_ACTIVE_REQUESTS.fetch_sub(1, Ordering::SeqCst);
reply
}

Expand Down Expand Up @@ -230,17 +223,3 @@ impl RaftService for RaftServer {
Ok(Response::new(reply))
}
}

static SEND_PROPOSAL_ACTIVE_REQUESTS: Lazy<Arc<AtomicIsize>> =
Lazy::new(|| Arc::new(AtomicIsize::new(0)));

static SEND_MESSAGE_ACTIVE_REQUESTS: Lazy<Arc<AtomicIsize>> =
Lazy::new(|| Arc::new(AtomicIsize::new(0)));

pub fn send_proposal_active_requests() -> isize {
SEND_PROPOSAL_ACTIVE_REQUESTS.load(Ordering::SeqCst)
}

pub fn send_message_active_requests() -> isize {
SEND_MESSAGE_ACTIVE_REQUESTS.load(Ordering::SeqCst)
}

0 comments on commit bfa56a3

Please sign in to comment.