Skip to content

Commit

Permalink
Merge pull request #7 from akasamq/reduce-write-syscall
Browse files Browse the repository at this point in the history
perf: reduce write syscall
  • Loading branch information
TheWaWaR authored Nov 14, 2023
2 parents 4658661 + 61e10c4 commit 54cd466
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 33 deletions.
4 changes: 2 additions & 2 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ FlashMQ : 250k connections, 0.9GB memory
VerneMQ : 50k connections, 20GB memory

# Message/s
FlashMQ : 40k coonections, 600k message/s, 1.0GB memory, CPU 550%
Akasa : 35k connections, 500k message/s, 2.3GB memory, CPU 650%
FlashMQ : 40k coonections, 600k message/s, 0.6GB memory, CPU 550%
Akasa : 40k connections, 600k message/s, 0.8GB memory, CPU 580%
EMQX : 20k connections, 300k message/s, 3.2GB memory, CPU 3000%
VerneMQ : 25k connections, 370k message/s, 6.0GB memory, CPU 2600%
```
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ FlashMQ : 250k connections, 0.9GB memory
VerneMQ : 50k connections, 20GB memory

# Message/s
FlashMQ : 40k coonections, 600k message/s, 1.0GB memory, CPU 550%
Akasa : 35k connections, 500k message/s, 2.3GB memory, CPU 650%
FlashMQ : 40k coonections, 600k message/s, 0.6GB memory, CPU 550%
Akasa : 40k connections, 600k message/s, 0.8GB memory, CPU 580%

This comment has been minimized.

Copy link
@yangby-cryptape

yangby-cryptape Nov 14, 2023

Wonderful!
Such an amazing update, it really shocked me!

EMQX : 20k connections, 300k message/s, 3.2GB memory, CPU 3000%
VerneMQ : 25k connections, 370k message/s, 6.0GB memory, CPU 2600%
```
Expand Down
72 changes: 53 additions & 19 deletions akasa-core/src/protocols/mqtt/online_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use mqtt_proto::{v3, v5, GenericPollPacket, GenericPollPacketState, PollHeader,
use crate::hook::{handle_request, Hook, HookAction, HookRequest, HookResponse};
use crate::state::{ClientId, ClientReceiver, ControlMessage, GlobalState, NormalMessage};

const WRITE_BATCH_SIZE: usize = 2048;

pub struct OnlineLoop<'a, C, S, H, Hk>
where
S: OnlineSession,
Expand Down Expand Up @@ -77,8 +79,8 @@ where
packet_state,
read_unfinish: false,
normal_stream_unfinish: false,
write_packets_max: 4,
write_packets: VecDeque::with_capacity(4),
write_packets_max: 16,
write_packets: VecDeque::with_capacity(16),
session_state_sender: None,
hook_fut: None,
}
Expand Down Expand Up @@ -215,7 +217,7 @@ where
// Read data from client connection
// * Produce to: [write_packets, broadcast_packets, external_request]
loop {
if write_packets.len() >= *write_packets_max
if too_much_write_data(write_packets, *write_packets_max)
|| session.broadcast_packets_cnt() >= session.broadcast_packets_max()
{
*read_unfinish = true;
Expand Down Expand Up @@ -325,7 +327,7 @@ where
// * Produce to: [write_packets]
// FIXME: drop message when pending queue are full
loop {
if write_packets.len() >= *write_packets_max {
if too_much_write_data(write_packets, *write_packets_max) {
*normal_stream_unfinish = true;
break;
} else {
Expand Down Expand Up @@ -370,33 +372,52 @@ where

// Write packets to client connection
// * Consume: [write_packets]
while let Some(write_packet) = write_packets.pop_front() {
log::trace!("[{}] encode packet: {:?}", current_client_id, write_packet);
let (data, mut idx) = match write_packet {
WritePacket::Packet(pkt) => match pkt.encode() {
Ok(data) => (data, 0),
Err(err) => return Poll::Ready(Some(err)),
},
WritePacket::Data((data, idx)) => (data, idx),
};
match Pin::new(&mut *conn).poll_write(cx, data.as_ref()) {
while !write_packets.is_empty() {
let (mut data_all, mut data_idx) = (Vec::new(), 0);
while let Some(write_packet) = write_packets.pop_front() {
log::trace!("[{}] encode packet: {:?}", current_client_id, write_packet);
match write_packet {
// NOTE: this must be the first item
WritePacket::Data((data, idx)) => {
data_all = match data {
VarBytes::Dynamic(d) => d,
VarBytes::Fixed2(d) => d.to_vec(),
VarBytes::Fixed4(d) => d.to_vec(),
};
data_idx = idx;
}
WritePacket::Packet(pkt) => match pkt.encode() {
Ok(data) => data_all.extend(data.as_ref()),
Err(err) => return Poll::Ready(Some(err)),
},
}
// NOTE: For avoid potential memory leak
if data_all.len() >= WRITE_BATCH_SIZE {
break;
}
}

match Pin::new(&mut *conn).poll_write(cx, &data_all[data_idx..]) {
Poll::Ready(Ok(size)) => {
have_write = true;
log::trace!("[{}] write {} bytes data", current_client_id, size);
idx += size;
if idx < data.as_ref().len() {
write_packets.push_front(WritePacket::Data((data, idx)));
have_write = true;
data_idx += size;
if data_idx < data_all.len() {
write_packets
.push_front(WritePacket::Data((VarBytes::Dynamic(data_all), data_idx)));
break;
}
}
Poll::Ready(Err(err)) => return Poll::Ready(Some(err)),
Poll::Pending => {
write_packets.push_front(WritePacket::Data((data, idx)));
write_packets
.push_front(WritePacket::Data((VarBytes::Dynamic(data_all), data_idx)));
pendings.write = true;
break;
}
}
}

if have_write
&& write_packets.capacity() > (*write_packets_max) * 2
&& write_packets.len() <= *write_packets_max
Expand Down Expand Up @@ -557,6 +578,19 @@ where
}
}

fn too_much_write_data<P>(
write_packets: &VecDeque<WritePacket<P>>,
write_packets_max: usize,
) -> bool {
if write_packets.len() >= write_packets_max {
true
} else if let Some(WritePacket::Data((VarBytes::Dynamic(data), _))) = write_packets.front() {
data.len() >= WRITE_BATCH_SIZE
} else {
false
}
}

#[derive(Debug, Clone, Copy, Default)]
struct Pendings {
// producer
Expand Down
18 changes: 13 additions & 5 deletions akasa-core/src/tests/protocols/mqtt/v3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,21 @@ trait ClientV3 {
#[async_trait]
impl ClientV3 for MockConnControl {
fn try_read_packet(&mut self) -> Result<Packet, TryRecvError> {
self.chan_out
.try_recv()
.map(|data| Packet::decode(&data).unwrap().unwrap())
self.chan_out.try_recv().map(|data| {
self.recv_data_buf.extend(data);
let packet = Packet::decode(&self.recv_data_buf).unwrap().unwrap();
self.recv_data_buf = self.recv_data_buf.split_off(packet.encode_len().unwrap());
packet
})
}
async fn read_packet(&mut self) -> Packet {
let data = self.chan_out.recv().await.unwrap();
Packet::decode(&data).unwrap().unwrap()
if self.recv_data_buf.is_empty() {
let data = self.chan_out.recv().await.unwrap();
self.recv_data_buf.extend(data);
}
let packet = Packet::decode(&self.recv_data_buf).unwrap().unwrap();
self.recv_data_buf = self.recv_data_buf.split_off(packet.encode_len().unwrap());
packet
}
async fn write_packet(&self, packet: Packet) {
self.write_data(packet.encode().unwrap().as_ref().to_vec())
Expand Down
18 changes: 13 additions & 5 deletions akasa-core/src/tests/protocols/mqtt/v5/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,21 @@ trait ClientV5 {
#[async_trait]
impl ClientV5 for MockConnControl {
fn try_read_packet(&mut self) -> Result<Packet, TryRecvError> {
self.chan_out
.try_recv()
.map(|data| Packet::decode(&data).unwrap().unwrap())
self.chan_out.try_recv().map(|data| {
self.recv_data_buf.extend(data);
let packet = Packet::decode(&self.recv_data_buf).unwrap().unwrap();
self.recv_data_buf = self.recv_data_buf.split_off(packet.encode_len().unwrap());
packet
})
}
async fn read_packet(&mut self) -> Packet {
let data = self.chan_out.recv().await.unwrap();
Packet::decode(&data).unwrap().unwrap()
if self.recv_data_buf.is_empty() {
let data = self.chan_out.recv().await.unwrap();
self.recv_data_buf.extend(data);
}
let packet = Packet::decode(&self.recv_data_buf).unwrap().unwrap();
self.recv_data_buf = self.recv_data_buf.split_off(packet.encode_len().unwrap());
packet
}
async fn write_packet(&self, packet: Packet) {
self.write_data(packet.encode().unwrap().as_ref().to_vec())
Expand Down
2 changes: 2 additions & 0 deletions akasa-core/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl GlobalState {
pub struct MockConnControl {
pub chan_in: Sender<Vec<u8>>,
pub chan_out: Receiver<Vec<u8>>,
pub recv_data_buf: Vec<u8>,
pub global: Arc<GlobalState>,
}

Expand All @@ -70,6 +71,7 @@ impl MockConn {
let control = MockConnControl {
chan_in: in_tx,
chan_out: out_rx,
recv_data_buf: Vec::new(),
global,
};
(conn, control)
Expand Down

0 comments on commit 54cd466

Please sign in to comment.