Skip to content

Commit

Permalink
[refactor] No more ops means None, not Err
Browse files Browse the repository at this point in the history
Signed-off-by: dd di cesare <[email protected]>
  • Loading branch information
didierofrivia committed Oct 29, 2024
1 parent fd7ed38 commit e991d6a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 38 deletions.
35 changes: 15 additions & 20 deletions src/filter/http_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::operation_dispatcher::{OperationDispatcher, OperationError};
use crate::service::GrpcService;
use log::{debug, warn};
use proxy_wasm::traits::{Context, HttpContext};
use proxy_wasm::types::{Action, Status};
use proxy_wasm::types::Action;
use std::cell::RefCell;
use std::rc::Rc;

Expand Down Expand Up @@ -53,37 +53,32 @@ impl Filter {
);
return Action::Continue;
}
let res = self.operation_dispatcher.borrow_mut().next();
if let Ok(operation) = res {
match operation.get_result() {

match self.operation_dispatcher.borrow_mut().next() {
Ok(Some(op)) => match op.get_result() {
Ok(call_id) => {
debug!("#{} initiated gRPC call (id# {})", self.context_id, call_id);
Action::Pause
}
Err(e) => {
warn!("gRPC call failed! {e:?}");
if let FailureMode::Deny = operation.get_failure_mode() {
if let FailureMode::Deny = op.get_failure_mode() {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
Action::Continue
}
},
Ok(None) => {
Action::Continue // No operations left to perform
}
} else {
let op_err = res.unwrap_err();
match op_err {
OperationError {
status: Status::Empty,
..
} => Action::Continue,
OperationError {
failure_mode: FailureMode::Deny,
..
} => {
self.send_http_response(500, vec![], Some(format!("{op_err}").as_ref()));
Action::Continue
}
_ => Action::Continue,
Err(OperationError {
failure_mode: FailureMode::Deny,
..
}) => {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"));
Action::Continue
}
_ => Action::Continue,
}
}
}
Expand Down
36 changes: 18 additions & 18 deletions src/operation_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ impl OperationError {
impl fmt::Display for OperationError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.status {
Status::Empty => {
write!(f, "No more operations to perform.")
}
Status::ParseFailure => {
write!(f, "Error parsing configuration.")
}
Expand Down Expand Up @@ -199,7 +196,7 @@ impl OperationDispatcher {
self.operations.extend(operations);
}

pub fn next(&mut self) -> Result<Rc<Operation>, OperationError> {
pub fn next(&mut self) -> Result<Option<Rc<Operation>>, OperationError> {
if let Some((i, operation)) = self.operations.iter_mut().enumerate().next() {
match operation.get_state() {
State::Pending => {
Expand All @@ -214,7 +211,7 @@ impl OperationDispatcher {
// We index only if it was just transitioned to Waiting after triggering
self.waiting_operations.insert(token_id, operation.clone());
// TODO(didierofrivia): Decide on indexing the failed operations.
Ok(operation.clone())
Ok(Some(operation.clone()))
}
State::Done => self.next(),
}
Expand All @@ -232,7 +229,7 @@ impl OperationDispatcher {
}
State::Waiting => {
operation.next_state();
Ok(operation.clone())
Ok(Some(operation.clone()))
}
State::Done => {
if let Ok(token_id) = operation.get_result() {
Expand All @@ -243,7 +240,7 @@ impl OperationDispatcher {
}
}
} else {
Err(OperationError::new(Status::Empty, FailureMode::default())) // No more operations
Ok(None)
}
}

Expand Down Expand Up @@ -467,31 +464,34 @@ mod tests {
assert_eq!(operation_dispatcher.waiting_operations.len(), 0);

let mut op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(66));
assert_eq!(op.clone().unwrap().unwrap().get_result(), Ok(66));
assert_eq!(
*op.clone().unwrap().get_service_type(),
*op.clone().unwrap().unwrap().get_service_type(),
ServiceType::RateLimit
);
assert_eq!(op.unwrap().get_state(), State::Waiting);
assert_eq!(op.unwrap().unwrap().get_state(), State::Waiting);
assert_eq!(operation_dispatcher.waiting_operations.len(), 1);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(66));
assert_eq!(op.unwrap().get_state(), State::Done);
assert_eq!(op.clone().unwrap().unwrap().get_result(), Ok(66));
assert_eq!(op.unwrap().unwrap().get_state(), State::Done);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(77));
assert_eq!(*op.clone().unwrap().get_service_type(), ServiceType::Auth);
assert_eq!(op.unwrap().get_state(), State::Waiting);
assert_eq!(op.clone().unwrap().unwrap().get_result(), Ok(77));
assert_eq!(
*op.clone().unwrap().unwrap().get_service_type(),
ServiceType::Auth
);
assert_eq!(op.unwrap().unwrap().get_state(), State::Waiting);
assert_eq!(operation_dispatcher.waiting_operations.len(), 1);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(77));
assert_eq!(op.unwrap().get_state(), State::Done);
assert_eq!(op.clone().unwrap().unwrap().get_result(), Ok(77));
assert_eq!(op.unwrap().unwrap().get_state(), State::Done);
assert_eq!(operation_dispatcher.waiting_operations.len(), 1);

op = operation_dispatcher.next();
assert!(op.is_err());
assert!(op.unwrap().is_none());
assert!(operation_dispatcher.get_current_operation_state().is_none());
assert_eq!(operation_dispatcher.waiting_operations.len(), 0);
}
Expand Down

0 comments on commit e991d6a

Please sign in to comment.