Skip to content

Commit

Permalink
[refactor] Implementing OperationError
Browse files Browse the repository at this point in the history
Signed-off-by: dd di cesare <[email protected]>
  • Loading branch information
didierofrivia committed Oct 29, 2024
1 parent 5823350 commit f07915d
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ impl TryFrom<PluginConfiguration> for FilterConfig {
}
}

#[derive(Deserialize, Debug, Clone, Default, PartialEq)]
#[derive(Deserialize, Debug, Copy, Clone, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum FailureMode {
#[default]
Expand Down
20 changes: 12 additions & 8 deletions src/filter/http_context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::configuration::action_set::ActionSet;
use crate::configuration::{FailureMode, FilterConfig};
use crate::operation_dispatcher::OperationDispatcher;
use crate::operation_dispatcher::{OperationDispatcher, OperationError};
use crate::service::GrpcService;
use log::{debug, warn};
use proxy_wasm::traits::{Context, HttpContext};
Expand Down Expand Up @@ -65,12 +65,16 @@ impl Filter {
}
}
} else {
match res {
Err((Status::Empty, _)) => Action::Continue,
Err((_, failure_mode)) => {
if failure_mode == FailureMode::Deny {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
}
match res.unwrap_err() {
OperationError {
status: Status::Empty,
..
} => Action::Continue,
OperationError {
failure_mode: FailureMode::Deny,
..
} => {
self.send_http_response(500, vec![], Some(b"Internal Server Error.\n"));
Action::Continue
}
_ => Action::Continue,
Expand Down Expand Up @@ -131,7 +135,7 @@ impl Context for Filter {
}
} else {
warn!("No Operation found with token_id: {token_id}");
GrpcService::handle_error_on_grpc_response(&FailureMode::Deny); // TODO(didierofrivia): Decide on what's the default failure mode
GrpcService::handle_error_on_grpc_response(FailureMode::Deny); // TODO(didierofrivia): Decide on what's the default failure mode
}
}
}
63 changes: 48 additions & 15 deletions src/operation_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use proxy_wasm::hostcalls;
use proxy_wasm::types::{Bytes, MapType, Status};
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt;
use std::rc::Rc;
use std::time::Duration;

Expand All @@ -31,10 +32,10 @@ impl State {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub(crate) struct Operation {
state: RefCell<State>,
result: RefCell<Result<u32, Status>>,
result: RefCell<Result<u32, OperationError>>,
service: Rc<Service>,
action: Action,
service_handler: Rc<GrpcServiceHandler>,
Expand Down Expand Up @@ -63,17 +64,22 @@ impl Operation {
}
}

fn trigger(&self) -> Result<u32, Status> {
fn trigger(&self) -> Result<u32, OperationError> {
if let Some(message) = (self.grpc_message_build_fn)(self.get_service_type(), &self.action) {
let res = self.service_handler.send(
self.get_map_values_bytes_fn,
self.grpc_call_fn,
message,
self.service.timeout.0,
);
self.set_result(res);
match res {
Ok(token_id) => self.set_result(Ok(token_id)),
Err(status) => {
self.set_result(Err(OperationError::new(status, self.get_failure_mode())))
}
}
self.next_state();
res
self.get_result()
} else {
self.done();
self.get_result()
Expand All @@ -92,20 +98,47 @@ impl Operation {
*self.state.borrow()
}

pub fn get_result(&self) -> Result<u32, Status> {
pub fn get_result(&self) -> Result<u32, OperationError> {
*self.result.borrow()
}

fn set_result(&self, result: Result<u32, Status>) {
fn set_result(&self, result: Result<u32, OperationError>) {
*self.result.borrow_mut() = result;
}

pub fn get_service_type(&self) -> &ServiceType {
&self.service.service_type
}

pub fn get_failure_mode(&self) -> &FailureMode {
&self.service.failure_mode
pub fn get_failure_mode(&self) -> FailureMode {
self.service.failure_mode
}
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct OperationError {
pub status: Status,
pub failure_mode: FailureMode,
}

impl OperationError {
fn new(status: Status, failure_mode: FailureMode) -> Self {
Self {
status,
failure_mode,
}
}
}

impl fmt::Display for OperationError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.status {
Status::Empty => {
write!(f, "No more operations to perform ")
}
_ => {
write!(f, "Error triggering the operation. {:?}", self.status)
}
}
}
}

Expand Down Expand Up @@ -147,7 +180,7 @@ impl OperationDispatcher {
self.operations.extend(operations);
}

pub fn next(&mut self) -> Result<Rc<Operation>, (Status, FailureMode)> {
pub fn next(&mut self) -> Result<Rc<Operation>, OperationError> {
if let Some((i, operation)) = self.operations.iter_mut().enumerate().next() {
match operation.get_state() {
State::Pending => {
Expand All @@ -167,9 +200,9 @@ impl OperationDispatcher {
State::Done => self.next(),
}
}
Err(status) => {
error!("{status:?}");
Err((status, operation.get_failure_mode().clone()))
Err(err) => {
error!("{err:?}");
Err(err)
}
}
} else {
Expand All @@ -191,7 +224,7 @@ impl OperationDispatcher {
}
}
} else {
Err((Status::Empty, FailureMode::default())) // No more operations
Err(OperationError::new(Status::Empty, FailureMode::default())) // No more operations
}
}

Expand Down Expand Up @@ -333,7 +366,7 @@ mod tests {

assert_eq!(operation.get_state(), State::Pending);
assert_eq!(*operation.get_service_type(), ServiceType::RateLimit);
assert_eq!(*operation.get_failure_mode(), FailureMode::Deny);
assert_eq!(operation.get_failure_mode(), FailureMode::Deny);
assert_eq!(operation.get_result(), Ok(0));
}

Expand Down
8 changes: 4 additions & 4 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::cell::OnceCell;
use std::rc::Rc;
use std::time::Duration;

#[derive(Default)]
#[derive(Default, Debug)]
pub struct GrpcService {
service: Rc<Service>,
name: &'static str,
Expand Down Expand Up @@ -81,7 +81,7 @@ impl GrpcService {
}
}

pub fn handle_error_on_grpc_response(failure_mode: &FailureMode) {
pub fn handle_error_on_grpc_response(failure_mode: FailureMode) {
match failure_mode {
FailureMode::Deny => {
hostcalls::send_http_response(500, vec![], Some(b"Internal Server Error.\n"))
Expand All @@ -105,7 +105,7 @@ pub type GetMapValuesBytesFn = fn(map_type: MapType, key: &str) -> Result<Option

pub type GrpcMessageBuildFn =
fn(service_type: &ServiceType, action: &Action) -> Option<GrpcMessageRequest>;

#[derive(Debug)]
pub struct GrpcServiceHandler {
grpc_service: Rc<GrpcService>,
header_resolver: Rc<HeaderResolver>,
Expand Down Expand Up @@ -148,7 +148,7 @@ impl GrpcServiceHandler {
Rc::clone(&self.grpc_service.service)
}
}

#[derive(Debug)]
pub struct HeaderResolver {
headers: OnceCell<Vec<(&'static str, Bytes)>>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/service/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl AuthService {

pub fn process_auth_grpc_response(
auth_resp: GrpcMessageResponse,
failure_mode: &FailureMode,
failure_mode: FailureMode,
) -> Result<(), StatusCode> {
if let GrpcMessageResponse::Auth(check_response) = auth_resp {
// store dynamic metadata in filter state
Expand Down
2 changes: 1 addition & 1 deletion src/service/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl RateLimitService {

pub fn process_ratelimit_grpc_response(
rl_resp: GrpcMessageResponse,
failure_mode: &FailureMode,
failure_mode: FailureMode,
) -> Result<(), StatusCode> {
match rl_resp {
GrpcMessageResponse::RateLimit(RateLimitResponse {
Expand Down

0 comments on commit f07915d

Please sign in to comment.