Skip to content

Commit

Permalink
Merge pull request #125 from Kuadrant/operation-dispatcher-err-handling
Browse files Browse the repository at this point in the history
Operation dispatcher err handling
  • Loading branch information
didierofrivia authored Nov 6, 2024
2 parents b7f334e + 5397dde commit 983261d
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 58 deletions.
2 changes: 1 addition & 1 deletion src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ impl TryFrom<PluginConfiguration> for FilterConfig {
}
}

#[derive(Deserialize, Debug, Clone, Default, PartialEq)]
#[derive(Deserialize, Debug, Copy, Clone, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum FailureMode {
#[default]
Expand Down
70 changes: 52 additions & 18 deletions src/filter/http_context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::configuration::action_set::ActionSet;
use crate::configuration::{FailureMode, FilterConfig};
use crate::operation_dispatcher::OperationDispatcher;
use crate::operation_dispatcher::{OperationDispatcher, OperationError};
use crate::service::GrpcService;
use log::{debug, warn};
use proxy_wasm::traits::{Context, HttpContext};
Expand Down Expand Up @@ -39,9 +39,13 @@ impl Filter {
"#{} action_set selected {}",
self.context_id, action_set.name
);
self.operation_dispatcher
if let Err(op_err) = self
.operation_dispatcher
.borrow_mut()
.build_operations(&action_set.actions);
.build_operations(&action_set.actions)
{
self.send_http_response(500, vec![], Some(format!("{op_err}").as_ref()));
}
} else {
debug!(
"#{} process_action_sets: no action_set with conditions applies",
Expand All @@ -50,22 +54,38 @@ impl Filter {
return Action::Continue;
}

if let Some(operation) = self.operation_dispatcher.borrow_mut().next() {
match operation.get_result() {
match self.operation_dispatcher.borrow_mut().next() {
Ok(Some(op)) => match op.get_result() {
Ok(call_id) => {
debug!("#{} initiated gRPC call (id# {})", self.context_id, call_id);
Action::Pause
}
Err(e) => {
warn!("gRPC call failed! {e:?}");
if let FailureMode::Deny = operation.get_failure_mode() {
if let FailureMode::Deny = op.get_failure_mode() {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
Action::Continue
}
},
Ok(None) => {
Action::Continue // No operations left to perform
}
Err(OperationError {
failure_mode: FailureMode::Deny,
status,
}) => {
warn!("OperationError Status: {status:?}");
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"));
Action::Continue
}
Err(OperationError {
failure_mode: FailureMode::Allow,
status,
}) => {
warn!("OperationError Status: {status:?}");
Action::Continue
}
} else {
Action::Continue
}
}
}
Expand Down Expand Up @@ -110,19 +130,33 @@ impl Context for Filter {
self.context_id
);

let some_op = self.operation_dispatcher.borrow().get_operation(token_id);
let op_res = self
.operation_dispatcher
.borrow()
.get_waiting_operation(token_id);

if let Some(operation) = some_op {
if GrpcService::process_grpc_response(operation, resp_size).is_ok() {
self.operation_dispatcher.borrow_mut().next();
if let Some(_op) = self.operation_dispatcher.borrow_mut().next() {
} else {
self.resume_http_request()
match op_res {
Ok(operation) => {
if GrpcService::process_grpc_response(operation, resp_size).is_ok() {
// call the next op
match self.operation_dispatcher.borrow_mut().next() {
Ok(some_op) => {
if some_op.is_none() {
// No more operations left in queue, resuming
self.resume_http_request();
}
}
Err(op_err) => {
// If desired, we could check the error status.
GrpcService::handle_error_on_grpc_response(op_err.failure_mode);
}
}
}
}
} else {
warn!("No Operation found with token_id: {token_id}");
GrpcService::handle_error_on_grpc_response(&FailureMode::Deny); // TODO(didierofrivia): Decide on what's the default failure mode
Err(e) => {
warn!("No Operation found with token_id: {token_id}");
GrpcService::handle_error_on_grpc_response(e.failure_mode);
}
}
}
}
116 changes: 84 additions & 32 deletions src/operation_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use proxy_wasm::hostcalls;
use proxy_wasm::types::{Bytes, MapType, Status};
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt;
use std::rc::Rc;
use std::time::Duration;

Expand All @@ -31,10 +32,10 @@ impl State {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub(crate) struct Operation {
state: RefCell<State>,
result: RefCell<Result<u32, Status>>,
result: RefCell<Result<u32, OperationError>>,
service: Rc<Service>,
action: Action,
service_handler: Rc<GrpcServiceHandler>,
Expand Down Expand Up @@ -63,17 +64,22 @@ impl Operation {
}
}

fn trigger(&self) -> Result<u32, Status> {
fn trigger(&self) -> Result<u32, OperationError> {
if let Some(message) = (self.grpc_message_build_fn)(self.get_service_type(), &self.action) {
let res = self.service_handler.send(
self.get_map_values_bytes_fn,
self.grpc_call_fn,
message,
self.service.timeout.0,
);
self.set_result(res);
match res {
Ok(token_id) => self.set_result(Ok(token_id)),
Err(status) => {
self.set_result(Err(OperationError::new(status, self.get_failure_mode())))
}
}
self.next_state();
res
self.get_result()
} else {
self.done();
self.get_result()
Expand All @@ -92,20 +98,47 @@ impl Operation {
*self.state.borrow()
}

pub fn get_result(&self) -> Result<u32, Status> {
pub fn get_result(&self) -> Result<u32, OperationError> {
*self.result.borrow()
}

fn set_result(&self, result: Result<u32, Status>) {
fn set_result(&self, result: Result<u32, OperationError>) {
*self.result.borrow_mut() = result;
}

pub fn get_service_type(&self) -> &ServiceType {
&self.service.service_type
}

pub fn get_failure_mode(&self) -> &FailureMode {
&self.service.failure_mode
pub fn get_failure_mode(&self) -> FailureMode {
self.service.failure_mode
}
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct OperationError {
pub status: Status,
pub failure_mode: FailureMode,
}

impl OperationError {
fn new(status: Status, failure_mode: FailureMode) -> Self {
Self {
status,
failure_mode,
}
}
}

impl fmt::Display for OperationError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.status {
Status::ParseFailure => {
write!(f, "Error parsing configuration.")
}
_ => {
write!(f, "Error triggering the operation. {:?}", self.status)
}
}
}
}

Expand All @@ -124,30 +157,46 @@ impl OperationDispatcher {
}
}

pub fn get_operation(&self, token_id: u32) -> Option<Rc<Operation>> {
self.waiting_operations.get(&token_id).cloned()
pub fn get_waiting_operation(&self, token_id: u32) -> Result<Rc<Operation>, OperationError> {
let op = self.waiting_operations.get(&token_id);
match op {
Some(op) => {
op.next_state();
Ok(op.clone())
}
None => Err(OperationError::new(
Status::NotFound,
FailureMode::default(),
)),
}
}

pub fn build_operations(&mut self, actions: &[Action]) {
pub fn build_operations(&mut self, actions: &[Action]) -> Result<(), OperationError> {
let mut operations: Vec<Rc<Operation>> = vec![];
for action in actions.iter() {
// TODO(didierofrivia): Error handling
if let Some(service) = self.service_handlers.get(&action.service) {
operations.push(Rc::new(Operation::new(
service.get_service(),
action.clone(),
Rc::clone(service),
)))
} else {
error!("Unknown service: {}", action.service);
return Err(OperationError::new(
Status::ParseFailure,
Default::default(),
));
}
}
self.push_operations(operations);
Ok(())
}

pub fn push_operations(&mut self, operations: Vec<Rc<Operation>>) {
self.operations.extend(operations);
}

pub fn next(&mut self) -> Option<Rc<Operation>> {
pub fn next(&mut self) -> Result<Option<Rc<Operation>>, OperationError> {
if let Some((i, operation)) = self.operations.iter_mut().enumerate().next() {
match operation.get_state() {
State::Pending => {
Expand All @@ -162,14 +211,14 @@ impl OperationDispatcher {
// We index only if it was just transitioned to Waiting after triggering
self.waiting_operations.insert(token_id, operation.clone());
// TODO(didierofrivia): Decide on indexing the failed operations.
Some(operation.clone())
Ok(Some(operation.clone()))
}
State::Done => self.next(),
}
}
Err(status) => {
error!("{status:?}");
None
Err(err) => {
error!("{err:?}");
Err(err)
}
}
} else {
Expand All @@ -180,7 +229,7 @@ impl OperationDispatcher {
}
State::Waiting => {
operation.next_state();
Some(operation.clone())
Ok(Some(operation.clone()))
}
State::Done => {
if let Ok(token_id) = operation.get_result() {
Expand All @@ -191,7 +240,7 @@ impl OperationDispatcher {
}
}
} else {
None
Ok(None)
}
}

Expand Down Expand Up @@ -333,7 +382,7 @@ mod tests {

assert_eq!(operation.get_state(), State::Pending);
assert_eq!(*operation.get_service_type(), ServiceType::RateLimit);
assert_eq!(*operation.get_failure_mode(), FailureMode::Deny);
assert_eq!(operation.get_failure_mode(), FailureMode::Deny);
assert_eq!(operation.get_result(), Ok(0));
}

Expand Down Expand Up @@ -415,31 +464,34 @@ mod tests {
assert_eq!(operation_dispatcher.waiting_operations.len(), 0);

let mut op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(66));
assert_eq!(op.clone().unwrap().unwrap().get_result(), Ok(66));
assert_eq!(
*op.clone().unwrap().get_service_type(),
*op.clone().unwrap().unwrap().get_service_type(),
ServiceType::RateLimit
);
assert_eq!(op.unwrap().get_state(), State::Waiting);
assert_eq!(op.unwrap().unwrap().get_state(), State::Waiting);
assert_eq!(operation_dispatcher.waiting_operations.len(), 1);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(66));
assert_eq!(op.unwrap().get_state(), State::Done);
assert_eq!(op.clone().unwrap().unwrap().get_result(), Ok(66));
assert_eq!(op.unwrap().unwrap().get_state(), State::Done);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(77));
assert_eq!(*op.clone().unwrap().get_service_type(), ServiceType::Auth);
assert_eq!(op.unwrap().get_state(), State::Waiting);
assert_eq!(op.clone().unwrap().unwrap().get_result(), Ok(77));
assert_eq!(
*op.clone().unwrap().unwrap().get_service_type(),
ServiceType::Auth
);
assert_eq!(op.unwrap().unwrap().get_state(), State::Waiting);
assert_eq!(operation_dispatcher.waiting_operations.len(), 1);

op = operation_dispatcher.next();
assert_eq!(op.clone().unwrap().get_result(), Ok(77));
assert_eq!(op.unwrap().get_state(), State::Done);
assert_eq!(op.clone().unwrap().unwrap().get_result(), Ok(77));
assert_eq!(op.unwrap().unwrap().get_state(), State::Done);
assert_eq!(operation_dispatcher.waiting_operations.len(), 1);

op = operation_dispatcher.next();
assert!(op.is_none());
assert!(op.unwrap().is_none());
assert!(operation_dispatcher.get_current_operation_state().is_none());
assert_eq!(operation_dispatcher.waiting_operations.len(), 0);
}
Expand Down
Loading

0 comments on commit 983261d

Please sign in to comment.