Skip to content

Commit

Permalink
Merge pull request #4 from akasamq/remove-hook-service
Browse files Browse the repository at this point in the history
feat: remove hook service
  • Loading branch information
TheWaWaR authored Oct 28, 2023
2 parents 9528307 + def54de commit 9b8100a
Show file tree
Hide file tree
Showing 13 changed files with 229 additions and 471 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

[workspace]
members = ["akasa-core", "akasa"]
resolver = "2"

[profile.release]
debug = true
Expand Down
10 changes: 5 additions & 5 deletions akasa-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ pub struct HookConfig {
impl Default for HookConfig {
fn default() -> HookConfig {
HookConfig {
enable_before_connect: false,
enable_after_connect: false,
enable_publish: false,
enable_subscribe: false,
enable_unsubscribe: false,
enable_before_connect: true,
enable_after_connect: true,
enable_publish: true,
enable_subscribe: true,
enable_unsubscribe: true,
}
}
}
Expand Down
131 changes: 21 additions & 110 deletions akasa-core/src/hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use flume::Receiver;
use mqtt_proto::{
QoS, QosPid, TopicFilter, TopicName, {v3, v5},
};
use thiserror::Error;
use tokio::sync::oneshot;

use crate::protocols::mqtt::v3::{
packet::{
Expand All @@ -32,7 +30,7 @@ use crate::protocols::mqtt::v5::{
Session as SessionV5,
};
use crate::protocols::mqtt::{OnlineSession, WritePacket};
use crate::state::{Executor, GlobalState};
use crate::state::GlobalState;

// TODO:
// [ ] add timer support
Expand Down Expand Up @@ -361,161 +359,100 @@ impl<S: OnlineSession> LockedHookContext<S> {
}
}

pub type HookReceipt = Result<Vec<HookAction>, Option<io::Error>>;
pub enum HookResponse {
Normal(Result<Vec<HookAction>, Option<io::Error>>),
BeforeConnect(io::Result<HookConnectCode>),
AfterConnect(io::Result<Vec<HookAction>>),
}

pub enum HookRequest {
// Shutdown,
V5BeforeConnect {
peer: SocketAddr,
connect: v5::Connect,
sender: oneshot::Sender<io::Result<HookConnectCode>>,
},
V5AfterConnect {
context: LockedHookContext<SessionV5>,
session_present: bool,
sender: oneshot::Sender<io::Result<Vec<HookAction>>>,
},
V5Publish {
context: LockedHookContext<SessionV5>,
encode_len: usize,
packet_body: Vec<MaybeUninit<u8>>,
publish: v5::Publish,
sender: oneshot::Sender<HookReceipt>,
},
V5Subscribe {
context: LockedHookContext<SessionV5>,
encode_len: usize,
packet_body: Vec<MaybeUninit<u8>>,
subscribe: v5::Subscribe,
sender: oneshot::Sender<HookReceipt>,
},
V5Unsubscribe {
context: LockedHookContext<SessionV5>,
encode_len: usize,
packet_body: Vec<MaybeUninit<u8>>,
unsubscribe: v5::Unsubscribe,
sender: oneshot::Sender<HookReceipt>,
},

V3BeforeConnect {
peer: SocketAddr,
connect: v3::Connect,
sender: oneshot::Sender<io::Result<HookConnectCode>>,
},
V3AfterConnect {
context: LockedHookContext<SessionV3>,
session_present: bool,
sender: oneshot::Sender<io::Result<Vec<HookAction>>>,
},
V3Publish {
context: LockedHookContext<SessionV3>,
encode_len: usize,
packet_body: Vec<MaybeUninit<u8>>,
publish: v3::Publish,
sender: oneshot::Sender<HookReceipt>,
},
V3Subscribe {
context: LockedHookContext<SessionV3>,
encode_len: usize,
packet_body: Vec<MaybeUninit<u8>>,
subscribe: v3::Subscribe,
sender: oneshot::Sender<HookReceipt>,
},
V3Unsubscribe {
context: LockedHookContext<SessionV3>,
encode_len: usize,
packet_body: Vec<MaybeUninit<u8>>,
unsubscribe: v3::Unsubscribe,
sender: oneshot::Sender<HookReceipt>,
},
}

#[derive(Clone)]
pub struct HookService<E: Clone, H: Clone> {
executor: E,
pub async fn handle_request<H: Hook + Send + Sync>(
request: HookRequest,
handler: H,
requests: Receiver<HookRequest>,
global: Arc<GlobalState>,
}

impl<E, H> HookService<E, H>
where
E: Executor + Clone,
H: Hook + Clone + Send + Sync + 'static,
{
pub fn new(
executor: E,
handler: H,
requests: Receiver<HookRequest>,
global: Arc<GlobalState>,
) -> HookService<E, H> {
HookService {
executor,
handler,
requests,
global,
}
}

pub async fn start(self) {
loop {
let request = match self.requests.recv_async().await {
Ok(request) => request,
Err(err) => {
log::error!(
"[executor#{}] receive hook request failed, error: {:?}",
self.executor.id(),
err
);
break;
}
};

let handler = self.handler.clone();
let global = Arc::clone(&self.global);
self.executor
.spawn_local(handle_request(request, handler, global));
}
}
}

async fn handle_request<H: Hook>(request: HookRequest, handler: H, global: Arc<GlobalState>) {
) -> HookResponse {
match request {
HookRequest::V5BeforeConnect {
peer,
connect,
sender,
} => {
HookRequest::V5BeforeConnect { peer, connect } => {
log::debug!("got a v5 before connect request: {peer}, {connect:#?}");
let result = handler
.v5_before_connect(peer, &connect)
.await
.map_err(Into::into);
if let Err(_err) = sender.send(result) {
log::debug!("v5 before connect response receiver is closed");
}
HookResponse::BeforeConnect(result)
}
HookRequest::V5AfterConnect {
context,
session_present,
sender,
} => {
let session = context.session_ref();
log::debug!("got a v5 after connect request: {}", session.client_id());
let result = handler
.v5_after_connect(session, session_present)
.await
.map_err(Into::into);
if let Err(_err) = sender.send(result) {
log::debug!("v5 after connect response receiver is closed");
}
HookResponse::AfterConnect(result)
}
HookRequest::V5Publish {
mut context,
encode_len,
packet_body,
mut publish,
sender,
} => {
log::debug!("got a v5 publish request: {publish:#?}");
let (session, write_packets) = context.get_mut();
Expand Down Expand Up @@ -570,16 +507,13 @@ async fn handle_request<H: Hook>(request: HookRequest, handler: H, global: Arc<G
}
Err(err) => Err(Some(err.into())),
};
if let Err(_err) = sender.send(receipt) {
log::error!("send publish hook ack error");
}
HookResponse::Normal(receipt)
}
HookRequest::V5Subscribe {
mut context,
encode_len,
packet_body,
mut subscribe,
sender,
} => {
let (session, write_packets) = context.get_mut();
let body: &[u8] = unsafe { mem::transmute(&packet_body[..]) };
Expand Down Expand Up @@ -619,16 +553,13 @@ async fn handle_request<H: Hook>(request: HookRequest, handler: H, global: Arc<G
}
Err(err) => Err(Some(err.into())),
};
if let Err(_err) = sender.send(receipt) {
log::error!("send publish hook ack error");
}
HookResponse::Normal(receipt)
}
HookRequest::V5Unsubscribe {
mut context,
encode_len,
packet_body,
mut unsubscribe,
sender,
} => {
let (session, write_packets) = context.get_mut();
let body: &[u8] = unsafe { mem::transmute(&packet_body[..]) };
Expand All @@ -654,46 +585,34 @@ async fn handle_request<H: Hook>(request: HookRequest, handler: H, global: Arc<G
}
Err(err) => Err(Some(err.into())),
};
if let Err(_err) = sender.send(receipt) {
log::error!("send publish hook ack error");
}
HookResponse::Normal(receipt)
}

HookRequest::V3BeforeConnect {
peer,
connect,
sender,
} => {
HookRequest::V3BeforeConnect { peer, connect } => {
log::debug!("got a v3 before connect request: {peer}, {connect:#?}");
let result = handler
.v3_before_connect(peer, &connect)
.await
.map_err(Into::into);
if let Err(_err) = sender.send(result) {
log::debug!("v3 before connect response receiver is closed");
}
HookResponse::BeforeConnect(result)
}
HookRequest::V3AfterConnect {
context,
session_present,
sender,
} => {
let session = context.session_ref();
log::debug!("got a v3 after connect request: {}", session.client_id());
let result = handler
.v3_after_connect(session, session_present)
.await
.map_err(Into::into);
if let Err(_err) = sender.send(result) {
log::debug!("v3 after connect response receiver is closed");
}
HookResponse::AfterConnect(result)
}
HookRequest::V3Publish {
mut context,
encode_len,
packet_body,
mut publish,
sender,
} => {
log::debug!("got a v3 publish request: {publish:#?}");
let (session, write_packets) = context.get_mut();
Expand Down Expand Up @@ -722,16 +641,13 @@ async fn handle_request<H: Hook>(request: HookRequest, handler: H, global: Arc<G
Ok(_) => Err(Some(io::ErrorKind::InvalidData.into())),
Err(err) => Err(Some(err.into())),
};
if let Err(_err) = sender.send(receipt) {
log::error!("send publish hook ack error");
}
HookResponse::Normal(receipt)
}
HookRequest::V3Subscribe {
mut context,
encode_len,
packet_body,
mut subscribe,
sender,
} => {
let (session, write_packets) = context.get_mut();
let body: &[u8] = unsafe { mem::transmute(&packet_body[..]) };
Expand Down Expand Up @@ -776,16 +692,13 @@ async fn handle_request<H: Hook>(request: HookRequest, handler: H, global: Arc<G
Ok(_code) => Err(Some(io::ErrorKind::InvalidData.into())),
Err(err) => Err(Some(err.into())),
};
if let Err(_err) = sender.send(receipt) {
log::error!("send publish hook ack error");
}
HookResponse::Normal(receipt)
}
HookRequest::V3Unsubscribe {
mut context,
encode_len,
packet_body,
mut unsubscribe,
sender,
} => {
let (session, write_packets) = context.get_mut();
let body: &[u8] = unsafe { mem::transmute(&packet_body[..]) };
Expand All @@ -806,9 +719,7 @@ async fn handle_request<H: Hook>(request: HookRequest, handler: H, global: Arc<G
Ok(_code) => Err(Some(io::ErrorKind::InvalidData.into())),
Err(err) => Err(Some(err.into())),
};
if let Err(_err) = sender.send(receipt) {
log::error!("send publish hook ack error");
}
HookResponse::Normal(receipt)
}
}
}
4 changes: 2 additions & 2 deletions akasa-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ mod tests;

pub use crate::config::Config;
pub use crate::hook::{
Hook, HookAction, HookConnectCode, HookError, HookPublishCode, HookRequest, HookResult,
HookService, HookSubscribeCode, HookUnsubscribeCode, PublishAction, SubscribeAction,
Hook, HookAction, HookConnectCode, HookError, HookPublishCode, HookRequest, HookResponse,
HookResult, HookSubscribeCode, HookUnsubscribeCode, PublishAction, SubscribeAction,
UnsubscribeAction,
};
pub use crate::protocols::mqtt::{
Expand Down
Loading

0 comments on commit 9b8100a

Please sign in to comment.