Skip to content
This repository has been archived by the owner on Dec 6, 2024. It is now read-only.

Commit

Permalink
feat: proposal how to remove boilerplate in wrappers
Browse files Browse the repository at this point in the history
  • Loading branch information
taco-paco committed Sep 18, 2024
1 parent 4c28612 commit 3698e90
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 157 deletions.
36 changes: 25 additions & 11 deletions crates/worker/src/clients/dynamodb_clients/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use aws_sdk_dynamodb::operation::scan::ScanOutput;
use aws_sdk_dynamodb::operation::update_item::builders::UpdateItemFluentBuilder;
use aws_sdk_dynamodb::types::AttributeValue;
use aws_sdk_dynamodb::Client;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::sync::atomic::AtomicU8;
use std::sync::Arc;
use aws_sdk_dynamodb::operation::update_item::builders::UpdateItemFluentBuilder;
use tokio::sync::oneshot;
use types::item::{Item, Status};

Expand Down Expand Up @@ -42,13 +42,22 @@ impl DynamoDBClient {
match_result!(DBDeleteError, self.delete_item(id).await)
}

pub async fn update_item_raw(&self, update_item_builder: UpdateItemFluentBuilder) -> Result<(), DBUpdateError> {
let _ = update_item_builder.send().await;
pub async fn update_item_raw(
&self,
update_item_builder: &UpdateItemFluentBuilder,
) -> Result<(), DBUpdateError> {
let _ = update_item_builder.clone().send().await;
Ok(())
}

pub async fn update_item_raw_attempt(&self, update_item_builder: UpdateItemFluentBuilder) -> Result<Option<()>, DBUpdateError> {
match_result!(DBUpdateError, self.update_item_raw(update_item_builder).await)
pub async fn update_item_raw_attempt(
&self,
update_item_builder: &UpdateItemFluentBuilder,
) -> Result<Option<()>, DBUpdateError> {
match_result!(
DBUpdateError,
self.update_item_raw(update_item_builder).await
)
}

pub async fn update_item_status_conditional(
Expand Down Expand Up @@ -161,7 +170,7 @@ pub enum DynamoDBAction {
},
UpdateItemRaw {
update_item_builder: UpdateItemFluentBuilder,
sender: oneshot::Sender<Result<(), DBUpdateError>>
sender: oneshot::Sender<Result<(), DBUpdateError>>,
},
UpdateItemStatusConditional {
id: String,
Expand Down Expand Up @@ -203,11 +212,16 @@ impl ActionHandler for DynamoDBClient {
}
})
}
DynamoDBAction::UpdateItemRaw {update_item_builder, sender} => {
let result = self.update_item_raw_attempt(update_item_builder.clone()).await;
handle_action_result(result, sender, state).map(|sender| DynamoDBAction::UpdateItemRaw {
update_item_builder,
sender
DynamoDBAction::UpdateItemRaw {
update_item_builder,
sender,
} => {
let result = self.update_item_raw_attempt(&update_item_builder).await;
handle_action_result(result, sender, state).map(|sender| {
DynamoDBAction::UpdateItemRaw {
update_item_builder,
sender,
}
})
}
DynamoDBAction::UpdateItemStatusConditional {
Expand Down
167 changes: 54 additions & 113 deletions crates/worker/src/clients/dynamodb_clients/wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use aws_sdk_dynamodb::operation::scan::ScanOutput;
use aws_sdk_dynamodb::operation::update_item::builders::UpdateItemFluentBuilder;
use aws_sdk_dynamodb::types::AttributeValue;
use aws_sdk_dynamodb::Client;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::atomic::AtomicU8;
use std::sync::Arc;
use aws_sdk_dynamodb::operation::update_item::builders::UpdateItemFluentBuilder;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use types::item::{Item, Status};

use crate::clients::dynamodb_clients::client::{DynamoDBAction, DynamoDBClient};
use crate::clients::errors::{DBDeleteError, DBError, DBScanError, DBUpdateError};
use crate::clients::retriable::{Retrier, State};
use crate::clients::retriable::{execute_retriable_operation, Retrier, State};

#[derive(Clone)]
pub struct DynamoDBClientWrapper {
Expand All @@ -37,108 +37,61 @@ impl DynamoDBClientWrapper {
}

pub async fn delete_item(&self, id: &str) -> Result<(), DBDeleteError> {
match self.state.load(Ordering::Acquire) {
0 => match self.client.delete_item_attempt(id).await {
Ok(Some(val)) => return Ok(val),
Ok(None) => self
.state
.store(State::Reconnecting as u8, Ordering::Release),
Err(err) => return Err(err.into()),
},
1 => {}
_ => unreachable!(),
}
let operation = || self.client.delete_item_attempt(id);
let action_factory = |sender| DynamoDBAction::DeleteItem {
id: id.to_string(),
sender,
};

let (sender, receiver) = oneshot::channel();
self.actions_sender
.send(DynamoDBAction::DeleteItem {
id: id.to_string(),
sender,
})
.await;
receiver.await.unwrap()
// TODO: if all good. rewrite all other clients like that?
execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state)
.await
}

pub async fn get_item(&self, key: &str) -> Result<Option<Item>, DBError> {
match self.state.load(Ordering::Acquire) {
0 => match self.client.get_item_attempt(key).await {
Ok(Some(val)) => return Ok(val),
Ok(None) => self
.state
.store(State::Reconnecting as u8, Ordering::Release),
Err(err) => return Err(err.into()),
},
1 => {}
_ => unreachable!(),
}
let operation = || self.client.get_item_attempt(key);
let action_factory = |sender| DynamoDBAction::GetItem {
id: key.to_string(),
sender,
};

let (sender, receiver) = oneshot::channel();
self.actions_sender
.send(DynamoDBAction::GetItem {
id: key.to_string(),
sender,
})
.await;
receiver.await.unwrap()
execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state)
.await
}

pub async fn update_item_raw(&self, update_item_builder: UpdateItemFluentBuilder) -> Result<(), DBUpdateError> {
match self.state.load(Ordering::Acquire) {
0 => match self
.client
.update_item_raw_attempt(update_item_builder.clone())
.await
{
Ok(Some(val)) => return Ok(val),
Ok(None) => self
.state
.store(State::Reconnecting as u8, Ordering::Release),
Err(err) => return Err(err.into()),
},
1 => {}
_ => unreachable!(),
}
pub async fn update_item_raw(
&self,
update_item_builder: &UpdateItemFluentBuilder,
) -> Result<(), DBUpdateError> {
let operation = || self.client.update_item_raw_attempt(update_item_builder);

let action_factory = |sender| DynamoDBAction::UpdateItemRaw {
update_item_builder: update_item_builder.clone(),
sender,
};

let (sender, receiver) = oneshot::channel();
self.actions_sender
.send(DynamoDBAction::UpdateItemRaw {
update_item_builder,
sender
})
.await;
receiver.await.unwrap()
execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state)
.await
}

pub async fn scan_items_prior_to(
&self,
time: &DateTime<Utc>,
exclusive_start_key: &Option<HashMap<String, AttributeValue>>,
) -> Result<ScanOutput, DBScanError> {
match self.state.load(Ordering::Acquire) {
0 => match self
.client
let operation = || {
self.client
.scan_items_prior_to_attempt(time, exclusive_start_key)
.await
{
Ok(Some(val)) => return Ok(val),
Ok(None) => self
.state
.store(State::Reconnecting as u8, Ordering::Release),
Err(err) => return Err(err.into()),
},
1 => {}
_ => unreachable!(),
}
};

let (sender, receiver) = oneshot::channel();
self.actions_sender
.send(DynamoDBAction::ScanPriorTo {
time: time.clone(),
exclusive_start_key: exclusive_start_key.clone(),
sender,
})
.await;
receiver.await.unwrap()
let action_factory = |sender| DynamoDBAction::ScanPriorTo {
time: time.clone(),
exclusive_start_key: exclusive_start_key.clone(),
sender,
};

execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state)
.await
}

pub async fn update_item_status_conditional(
Expand All @@ -147,31 +100,19 @@ impl DynamoDBClientWrapper {
from: &Status,
to: &Status,
) -> Result<(), DBUpdateError> {
match self.state.load(Ordering::Acquire) {
0 => match self
.client
let operation = || {
self.client
.update_item_status_conditional_attempt(id, from, to)
.await
{
Ok(Some(val)) => return Ok(val),
Ok(None) => self
.state
.store(State::Reconnecting as u8, Ordering::Release),
Err(err) => return Err(err.into()),
},
1 => {}
_ => unreachable!(),
}
};

let action_factory = |sender| DynamoDBAction::UpdateItemStatusConditional {
id: id.to_string(),
from: from.clone(),
to: to.clone(),
sender,
};

let (sender, receiver) = oneshot::channel();
self.actions_sender
.send(DynamoDBAction::UpdateItemStatusConditional {
id: id.to_string(),
from: from.clone(),
to: to.clone(),
sender,
})
.await;
receiver.await.unwrap()
execute_retriable_operation(operation, action_factory, &self.actions_sender, &self.state)
.await
}
}
81 changes: 55 additions & 26 deletions crates/worker/src/clients/retriable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,31 +42,6 @@ pub trait ActionHandler {
async fn handle(&self, action: Self::Action, state: Arc<AtomicU8>) -> Option<Self::Action>;
}

pub(crate) fn handle_action_result<T, E>(
result: Result<Option<T>, E>,
sender: oneshot::Sender<Result<T, E>>,
state: Arc<AtomicU8>,
) -> Option<oneshot::Sender<Result<T, E>>>
where
T: Send + 'static,
E: Send + 'static,
{
match result {
Ok(Some(val)) => {
state.store(State::Connected as u8, Ordering::Release);
let _ = sender.send(Ok(val));
None
}
Err(err) => {
let _ = sender.send(Err(err));
None
}
Ok(None) => {
state.store(State::Reconnecting as u8, Ordering::Release);
Some(sender)
}
}
}
pub enum State {
Connected = 0,
Reconnecting = 1,
Expand All @@ -89,7 +64,7 @@ impl<T: ActionHandler> Retrier<T> {

pub async fn start(mut self) {
const SLEEP_DURATION: Duration = Duration::from_secs(3);
// add lru instead
// TODO: introduce limit
let mut pending_actions = vec![];

loop {
Expand Down Expand Up @@ -138,3 +113,57 @@ impl<T: ActionHandler> Retrier<T> {
pending_actions.truncate(pivot);
}
}

pub(crate) fn handle_action_result<T, E>(
result: Result<Option<T>, E>,
sender: oneshot::Sender<Result<T, E>>,
state: Arc<AtomicU8>,
) -> Option<oneshot::Sender<Result<T, E>>>
where
T: Send + 'static,
E: Send + 'static,
{
match result {
Ok(Some(val)) => {
state.store(State::Connected as u8, Ordering::Release);
let _ = sender.send(Ok(val));
None
}
Err(err) => {
let _ = sender.send(Err(err));
None
}
Ok(None) => {
state.store(State::Reconnecting as u8, Ordering::Release);
Some(sender)
}
}
}

pub(crate) async fn execute_retriable_operation<F, Fut, AFactory, A, T, E>(
operation: F,
action_factory: AFactory,
action_sender: &mpsc::Sender<A>,
state: &AtomicU8,
) -> Result<T, E>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<Option<T>, E>>,
AFactory: Fn(oneshot::Sender<Result<T, E>>) -> A,
{
match state.load(Ordering::Acquire) {
0 => match operation().await {
Ok(Some(val)) => return Ok(val),
Ok(None) => state.store(State::Reconnecting as u8, Ordering::Release),
Err(err) => return Err(err.into()),
},
1 => {}
_ => unreachable!(),
}

let (sender, receiver) = oneshot::channel();
let action = action_factory(sender);
action_sender.send(action).await;

receiver.await.unwrap() // TODO: remove unwrap
}
Loading

0 comments on commit 3698e90

Please sign in to comment.