Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/gateway message processing #1991

Merged
merged 14 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/traits/src/liquidity_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pub trait MessageProcessor {
/// Process a message.
fn process(msg: Self::Message) -> (DispatchResult, Weight);

/// Max weight that processing a message can take
/// Max weight that processing a message can take.
fn max_processing_weight(msg: &Self::Message) -> Weight;
}

Expand Down
85 changes: 59 additions & 26 deletions pallets/liquidity-pools-gateway-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use parity_scale_codec::FullCodec;
use scale_info::TypeInfo;
use sp_arithmetic::traits::BaseArithmetic;
use sp_runtime::traits::{EnsureAddAssign, One};
use sp_std::vec::Vec;

#[cfg(test)]
mod mock;
Expand Down Expand Up @@ -61,6 +60,12 @@ pub mod pallet {
#[pallet::getter(fn message_nonce_store)]
pub type MessageNonceStore<T: Config> = StorageValue<_, T::MessageNonce, ValueQuery>;

/// Storage that is used for keeping track of the last nonce that was
/// processed.
#[pallet::storage]
#[pallet::getter(fn last_processed_nonce)]
pub type LastProcessedNonce<T: Config> = StorageValue<_, T::MessageNonce, ValueQuery>;

/// Storage for messages that will be processed during the `on_idle` hook.
#[pallet::storage]
#[pallet::getter(fn message_queue)]
Expand Down Expand Up @@ -93,6 +98,11 @@ pub mod pallet {
message: T::Message,
error: DispatchError,
},

/// Maximum number of messages was reached.
MaxNumberOfMessagesReached {
last_processed_nonce: T::MessageNonce,
},
}

#[pallet::error]
Expand Down Expand Up @@ -200,11 +210,43 @@ pub mod pallet {
}

fn service_message_queue(max_weight: Weight) -> Weight {
let mut weight_used = Weight::zero();
let mut last_processed_nonce = LastProcessedNonce::<T>::get();

// 1 read for the last processed nonce
let mut weight_used = T::DbWeight::get().reads(1);

loop {
if last_processed_nonce.ensure_add_assign(One::one()).is_err() {
Self::deposit_event(Event::<T>::MaxNumberOfMessagesReached {
last_processed_nonce,
});

break;
}

let mut processed_entries = Vec::new();
// 1 read for the nonce
weight_used.saturating_accrue(T::DbWeight::get().reads(1));

if last_processed_nonce > MessageNonceStore::<T>::get() {
break;
}

// 1 read for the message
weight_used.saturating_accrue(T::DbWeight::get().reads(1));

let message = match MessageQueue::<T>::get(last_processed_nonce) {
Some(msg) => msg,
// No message found at this nonce, we can skip it.
None => {
LastProcessedNonce::<T>::set(last_processed_nonce);

// 1 write for setting the last processed nonce
weight_used.saturating_accrue(T::DbWeight::get().writes(1));

continue;
}
};

for (nonce, message) in MessageQueue::<T>::iter() {
let remaining_weight = max_weight.saturating_sub(weight_used);
let next_weight = T::MessageProcessor::max_processing_weight(&message);

Expand All @@ -213,37 +255,28 @@ pub mod pallet {
break;
}

let weight = match Self::process_message_and_deposit_event(nonce, message.clone()) {
(Ok(()), weight) => {
// Extra weight breakdown:
//
// 1 read for the message
// 1 write for the message removal
weight.saturating_add(T::DbWeight::get().reads_writes(1, 1))
}
let processing_weight = match Self::process_message_and_deposit_event(
last_processed_nonce,
message.clone(),
) {
(Ok(()), weight) => weight,
(Err(e), weight) => {
FailedMessageQueue::<T>::insert(nonce, (message, e));
FailedMessageQueue::<T>::insert(last_processed_nonce, (message, e));

// Extra weight breakdown:
//
// 1 read for the message
// 1 write for the failed message
// 1 write for the message removal
weight.saturating_add(T::DbWeight::get().reads_writes(1, 2))
weight.saturating_add(T::DbWeight::get().writes(1))
}
};

processed_entries.push(nonce);
weight_used.saturating_accrue(processing_weight);

weight_used = weight_used.saturating_add(weight);
MessageQueue::<T>::remove(last_processed_nonce);

if weight_used.all_gte(max_weight) {
cdamian marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}
LastProcessedNonce::<T>::set(last_processed_nonce);

for entry in processed_entries {
MessageQueue::<T>::remove(entry);
// 1 write for removing the message
// 1 write for setting the last processed nonce
weight_used.saturating_accrue(T::DbWeight::get().writes(2));
}

weight_used
Expand Down
69 changes: 65 additions & 4 deletions pallets/liquidity-pools-gateway-queue/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sp_runtime::{traits::BadOrigin, DispatchError};

use crate::{
mock::{new_test_ext, Processor, Queue, Runtime, RuntimeEvent as MockEvent, RuntimeOrigin},
Error, Event, FailedMessageQueue, MessageQueue,
Error, Event, FailedMessageQueue, LastProcessedNonce, MessageQueue,
};

mod utils {
Expand Down Expand Up @@ -181,7 +181,10 @@ mod process_failed_message {
}

mod message_queue_impl {
use sp_arithmetic::ArithmeticError::Overflow;

use super::*;
use crate::MessageNonceStore;

#[test]
fn success() {
Expand All @@ -197,6 +200,17 @@ mod message_queue_impl {
event_exists(Event::<Runtime>::MessageSubmitted { nonce, message })
});
}

#[test]
fn error_on_max_nonce() {
new_test_ext().execute_with(|| {
let message = 1;

MessageNonceStore::<Runtime>::set(u64::MAX);

assert_noop!(Queue::queue(message), Overflow);
});
}
}

mod on_idle {
Expand All @@ -209,7 +223,7 @@ mod on_idle {
#[test]
fn success_all() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));
Expand All @@ -220,13 +234,14 @@ mod on_idle {
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn not_all_messages_fit_in_the_block() {
new_test_ext().execute_with(|| {
(1..=5).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=5).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));
Expand All @@ -245,13 +260,14 @@ mod on_idle {
assert_eq!(weight, PROCESS_WEIGHT);
assert_eq!(handle.times(), 5);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 5)
});
}

#[test]
fn with_failed_messages() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| MessageQueue::<Runtime>::insert(i as u64, i * 10));
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|msg| match msg {
Expand All @@ -265,6 +281,51 @@ mod on_idle {
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 1);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn with_no_messages() {
new_test_ext().execute_with(|| {
let _ = Queue::on_idle(0, TOTAL_WEIGHT);

assert_eq!(LastProcessedNonce::<Runtime>::get(), 0)
});
}

#[test]
fn with_skipped_message_nonce() {
new_test_ext().execute_with(|| {
(1..=3).for_each(|i| Queue::queue(i * 10).unwrap());

Processor::mock_max_processing_weight(|_| PROCESS_LIMIT_WEIGHT);
let handle = Processor::mock_process(|_| (Ok(()), PROCESS_WEIGHT));

// Manually process the 2nd nonce, the on_idle hook should skip it and process
// the remaining nonces.
assert_ok!(Queue::process_message(RuntimeOrigin::signed(1), 2));

let weight = Queue::on_idle(0, TOTAL_WEIGHT);

assert_eq!(weight, PROCESS_WEIGHT * 2);
assert_eq!(handle.times(), 3);
assert_eq!(MessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(FailedMessageQueue::<Runtime>::iter().count(), 0);
assert_eq!(LastProcessedNonce::<Runtime>::get(), 3)
});
}

#[test]
fn max_messages() {
new_test_ext().execute_with(|| {
LastProcessedNonce::<Runtime>::set(u64::MAX);

let _ = Queue::on_idle(0, TOTAL_WEIGHT);

event_exists(Event::<Runtime>::MaxNumberOfMessagesReached {
last_processed_nonce: u64::MAX,
})
});
}
}
61 changes: 26 additions & 35 deletions pallets/liquidity-pools-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ use cfg_traits::liquidity_pools::{
OutboundMessageHandler, RouterProvider,
};
use cfg_types::domain_address::{Domain, DomainAddress};
use frame_support::{dispatch::DispatchResult, pallet_prelude::*};
use frame_support::{
dispatch::DispatchResult,
pallet_prelude::*,
storage::{with_transaction, TransactionOutcome},
};
use frame_system::pallet_prelude::{ensure_signed, OriginFor};
use message::GatewayMessage;
use orml_traits::GetByKey;
pub use pallet::*;
use parity_scale_codec::FullCodec;
use sp_arithmetic::traits::{BaseArithmetic, EnsureAddAssign, One};
use sp_runtime::SaturatedConversion;
use sp_std::convert::TryInto;

use crate::{
Expand Down Expand Up @@ -469,8 +472,8 @@ pub mod pallet {
router_ids.iter().any(|x| x == &router_id),
Error::<T>::UnknownRouter
);
// Message recovery shouldn't be supported for setups that have less than 1
// router since no proofs are required in that case.
// Message recovery shouldn't be supported for setups that have less than 2
// routers since no proofs are required in that case.
ensure!(router_ids.len() > 1, Error::<T>::NotEnoughRoutersForDomain);

let session_id = SessionIdStore::<T>::get();
Expand Down Expand Up @@ -621,45 +624,33 @@ pub mod pallet {
type Message = GatewayMessage<T::Message, T::RouterId>;

fn process(msg: Self::Message) -> (DispatchResult, Weight) {
match msg {
GatewayMessage::Inbound {
domain_address,
message,
router_id,
} => {
let mut counter = 0;

let res = Self::process_inbound_message(
// The #[transactional] macro only works for functions that return a
// `DispatchResult` therefore, we need to manually add this here.
Comment on lines +627 to +628
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Any reason for going with the overhead of with_transaction and branching over the result instead of just using #[transactional] and returning Ok(())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean here - do you mean changing the return of process to DispatchResult?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use the #[transactional] macro in this function, that is not supported. We can either change the signature as I suggested in my previous message, or go with the current approach.

let res = with_transaction(|| {
let res = match msg {
GatewayMessage::Inbound {
domain_address,
message,
router_id,
&mut counter,
);

let weight = match counter {
0 => LP_DEFENSIVE_WEIGHT / 10,
n => LP_DEFENSIVE_WEIGHT.saturating_mul(n),
};
} => Self::process_inbound_message(domain_address, message, router_id),
GatewayMessage::Outbound { message, router_id } => {
T::MessageSender::send(router_id, T::Sender::get(), message)
}
};

(res, weight)
if res.is_ok() {
TransactionOutcome::Commit(res)
} else {
TransactionOutcome::Rollback(res)
}
GatewayMessage::Outbound { message, router_id } => {
let res = T::MessageSender::send(router_id, T::Sender::get(), message);
});

(res, LP_DEFENSIVE_WEIGHT)
}
}
(res, LP_DEFENSIVE_WEIGHT)
}

/// Returns the max processing weight for a message, based on its
/// direction.
fn max_processing_weight(msg: &Self::Message) -> Weight {
match msg {
GatewayMessage::Inbound { message, .. } => {
LP_DEFENSIVE_WEIGHT.saturating_mul(message.submessages().len().saturated_into())
}
GatewayMessage::Outbound { .. } => LP_DEFENSIVE_WEIGHT,
}
/// Returns the maximum weight for processing one message.
fn max_processing_weight(_: &Self::Message) -> Weight {
LP_DEFENSIVE_WEIGHT
}
}

Expand Down
Loading
Loading