Skip to content

Commit

Permalink
方法:join(.., node_addr, ..)增加node_addr参数,表示本节点地址:端口
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Sep 6, 2024
1 parent f0ac64b commit f399e46
Showing 1 changed file with 9 additions and 10 deletions.
19 changes: 9 additions & 10 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,20 +242,20 @@ pub struct Raft<S: Store + 'static> {
store: S,
tx: mpsc::Sender<Message>,
rx: mpsc::Receiver<Message>,
addr: SocketAddr,
laddr: SocketAddr,
logger: slog::Logger,
cfg: Arc<Config>,
}

impl<S: Store + Send + Sync + 'static> Raft<S> {
/// creates a new node with the given address and store.
pub fn new<A: ToSocketAddrs>(
addr: A,
laddr: A,
store: S,
logger: slog::Logger,
cfg: Config,
) -> Result<Self> {
let addr = addr
let laddr = laddr
.to_socket_addrs()?
.next()
.ok_or_else(|| Error::from("None"))?;
Expand All @@ -265,7 +265,7 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {
store,
tx,
rx,
addr,
laddr,
logger,
cfg,
})
Expand Down Expand Up @@ -352,7 +352,7 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {
self.cfg.clone(),
)?;

let server = RaftServer::new(self.tx, self.addr, self.cfg.clone());
let server = RaftServer::new(self.tx, self.laddr, self.cfg.clone());
let server_handle = async {
if let Err(e) = server.run().await {
warn!("raft server run error: {:?}", e);
Expand All @@ -376,11 +376,10 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {
Ok(())
}

/// Tries to join a new cluster at `addr`, getting an id from the leader, or finding it if
/// `addr` is not the current leader of the cluster
pub async fn join(
self,
node_id: u64,
node_addr: String,
leader_id: Option<u64>,
leader_addr: String,
) -> Result<()> {
Expand All @@ -405,7 +404,7 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {
)?;
let peer = node.add_peer(&leader_addr, leader_id);
let mut client = peer.client().await?;
let server = RaftServer::new(self.tx, self.addr, self.cfg.clone());
let server = RaftServer::new(self.tx, self.laddr, self.cfg.clone());
let server_handle = async {
if let Err(e) = server.run().await {
warn!("raft server run error: {:?}", e);
Expand Down Expand Up @@ -438,8 +437,8 @@ impl<S: Store + Send + Sync + 'static> Raft<S> {
let mut change = ConfChange::default();
change.set_node_id(node_id);
change.set_change_type(ConfChangeType::AddNode);
change.set_context(serialize(&self.addr.to_string())?);
// change.set_context(serialize(&self.addr)?);
change.set_context(serialize(&node_addr)?);
// change.set_context(serialize(&node_addr)?);

let change = RiteraftConfChange {
inner: ConfChange::encode_to_vec(&change),
Expand Down

0 comments on commit f399e46

Please sign in to comment.