-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix/gateway message processing #1991
Changes from 11 commits
d8216f9
a23ff87
3d7d223
afd2900
0701522
8e9ae25
471f434
bc2cc3a
a3e6f6a
6874cb8
8d27c9e
ef843b2
a437056
731577f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,11 @@ use cfg_traits::liquidity_pools::{ | |
OutboundMessageHandler, RouterProvider, | ||
}; | ||
use cfg_types::domain_address::{Domain, DomainAddress}; | ||
use frame_support::{dispatch::DispatchResult, pallet_prelude::*}; | ||
use frame_support::{ | ||
dispatch::{DispatchResult, PostDispatchInfo}, | ||
pallet_prelude::*, | ||
storage::{with_transaction, TransactionOutcome}, | ||
}; | ||
use frame_system::pallet_prelude::{ensure_signed, OriginFor}; | ||
use message::GatewayMessage; | ||
use orml_traits::GetByKey; | ||
|
@@ -460,7 +464,7 @@ pub mod pallet { | |
domain_address: DomainAddress, | ||
message_hash: MessageHash, | ||
router_id: T::RouterId, | ||
) -> DispatchResult { | ||
) -> DispatchResultWithPostInfo { | ||
cdamian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
T::AdminOrigin::ensure_origin(origin)?; | ||
|
||
let router_ids = Self::get_router_ids_for_domain(domain_address.domain())?; | ||
|
@@ -469,8 +473,8 @@ pub mod pallet { | |
router_ids.iter().any(|x| x == &router_id), | ||
Error::<T>::UnknownRouter | ||
); | ||
// Message recovery shouldn't be supported for setups that have less than 1 | ||
// router since no proofs are required in that case. | ||
// Message recovery shouldn't be supported for setups that have less than 2 | ||
// routers since no proofs are required in that case. | ||
ensure!(router_ids.len() > 1, Error::<T>::NotEnoughRoutersForDomain); | ||
|
||
let session_id = SessionIdStore::<T>::get(); | ||
|
@@ -495,20 +499,26 @@ pub mod pallet { | |
|
||
let expected_proof_count = Self::get_expected_proof_count(&router_ids)?; | ||
|
||
let mut counter = 0u64; | ||
|
||
Self::execute_if_requirements_are_met( | ||
message_hash, | ||
&router_ids, | ||
session_id, | ||
expected_proof_count, | ||
domain_address, | ||
&mut counter, | ||
)?; | ||
|
||
Self::deposit_event(Event::<T>::MessageRecoveryExecuted { | ||
message_hash, | ||
router_id, | ||
}); | ||
|
||
Ok(()) | ||
Ok(PostDispatchInfo { | ||
actual_weight: Some(Self::get_weight_for_batch_messages(counter)), | ||
pays_fee: Pays::Yes, | ||
}) | ||
} | ||
|
||
/// Sends a message that initiates a message recovery using the | ||
|
@@ -587,6 +597,13 @@ pub mod pallet { | |
|
||
T::MessageSender::send(messaging_router, T::Sender::get(), message) | ||
} | ||
|
||
fn get_weight_for_batch_messages(count: u64) -> Weight { | ||
match count { | ||
0 => LP_DEFENSIVE_WEIGHT / 10, | ||
n => LP_DEFENSIVE_WEIGHT.saturating_mul(n), | ||
} | ||
} | ||
} | ||
|
||
impl<T: Config> OutboundMessageHandler for Pallet<T> { | ||
|
@@ -621,34 +638,44 @@ pub mod pallet { | |
type Message = GatewayMessage<T::Message, T::RouterId>; | ||
|
||
fn process(msg: Self::Message) -> (DispatchResult, Weight) { | ||
match msg { | ||
GatewayMessage::Inbound { | ||
domain_address, | ||
message, | ||
router_id, | ||
} => { | ||
let mut counter = 0; | ||
|
||
let res = Self::process_inbound_message( | ||
// The #[transactional] macro only works for functions that return a | ||
// `DispatchResult` therefore, we need to manually add this here. | ||
Comment on lines
+627
to
+628
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Q: Any reason for going with the overhead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what you mean here - do you mean changing the return of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't use the |
||
with_transaction(|| { | ||
lemunozm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let (res, weight) = match msg { | ||
GatewayMessage::Inbound { | ||
domain_address, | ||
message, | ||
router_id, | ||
&mut counter, | ||
); | ||
} => { | ||
let mut counter = 0; | ||
|
||
let weight = match counter { | ||
0 => LP_DEFENSIVE_WEIGHT / 10, | ||
n => LP_DEFENSIVE_WEIGHT.saturating_mul(n), | ||
}; | ||
let res = Self::process_inbound_message( | ||
domain_address, | ||
message, | ||
router_id, | ||
&mut counter, | ||
); | ||
|
||
(res, weight) | ||
} | ||
GatewayMessage::Outbound { message, router_id } => { | ||
let res = T::MessageSender::send(router_id, T::Sender::get(), message); | ||
(res, Self::get_weight_for_batch_messages(counter)) | ||
lemunozm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
GatewayMessage::Outbound { message, router_id } => { | ||
let res = T::MessageSender::send(router_id, T::Sender::get(), message); | ||
|
||
(res, LP_DEFENSIVE_WEIGHT) | ||
(res, LP_DEFENSIVE_WEIGHT) | ||
} | ||
}; | ||
|
||
if res.is_ok() { | ||
TransactionOutcome::Commit(Ok::<(DispatchResult, Weight), DispatchError>(( | ||
res, weight, | ||
))) | ||
} else { | ||
TransactionOutcome::Rollback(Ok::<(DispatchResult, Weight), DispatchError>(( | ||
res, weight, | ||
))) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Super NIT. I think adding the returned type in the closure allows to remove the with_transaction(|| -> DispatchResult { There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That won't work given:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But given that we will always return |
||
} | ||
}) | ||
.expect("success is ensured by the transaction closure") | ||
} | ||
|
||
/// Returns the max processing weight for a message, based on its | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -323,6 +323,7 @@ impl<T: Config> Pallet<T> { | |
session_id: T::SessionId, | ||
expected_proof_count: u32, | ||
domain_address: DomainAddress, | ||
counter: &mut u64, | ||
) -> DispatchResult { | ||
let mut message = None; | ||
let mut votes = 0; | ||
|
@@ -333,7 +334,11 @@ impl<T: Config> Pallet<T> { | |
// we can return. | ||
None => return Ok(()), | ||
Some(stored_inbound_entry) => match stored_inbound_entry { | ||
InboundEntry::Message(message_entry) => message = Some(message_entry.message), | ||
InboundEntry::Message(message_entry) | ||
if message_entry.session_id == session_id => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We should not because then it's impossible to replay the message and funds are stuck on the EVM side. Keeping entries for an old session id can be made unstuck via There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bumping this again since I think we might need to add the 1st point that I mentioned above. cc @hieronx |
||
{ | ||
message = Some(message_entry.message) | ||
} | ||
InboundEntry::Proof(proof_entry) | ||
if proof_entry.has_valid_vote_for_session(session_id) => | ||
{ | ||
|
@@ -349,9 +354,13 @@ impl<T: Config> Pallet<T> { | |
} | ||
|
||
if let Some(msg) = message { | ||
Self::execute_post_voting_dispatch(message_hash, router_ids, expected_proof_count)?; | ||
for submessage in msg.submessages() { | ||
cdamian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
counter.ensure_add_assign(1)?; | ||
|
||
T::InboundMessageHandler::handle(domain_address.clone(), msg)?; | ||
T::InboundMessageHandler::handle(domain_address.clone(), submessage)?; | ||
} | ||
|
||
Self::execute_post_voting_dispatch(message_hash, router_ids, expected_proof_count)?; | ||
|
||
Self::deposit_event(Event::<T>::InboundMessageExecuted { | ||
domain_address, | ||
|
@@ -406,37 +415,33 @@ impl<T: Config> Pallet<T> { | |
let router_ids = Self::get_router_ids_for_domain(domain_address.domain())?; | ||
let session_id = SessionIdStore::<T>::get(); | ||
let expected_proof_count = Self::get_expected_proof_count(&router_ids)?; | ||
let message_hash = message.get_message_hash(); | ||
let inbound_entry: InboundEntry<T> = InboundEntry::create( | ||
message.clone(), | ||
session_id, | ||
domain_address.clone(), | ||
expected_proof_count, | ||
); | ||
|
||
for submessage in message.submessages() { | ||
counter.ensure_add_assign(1)?; | ||
|
||
let message_hash = submessage.get_message_hash(); | ||
|
||
let inbound_entry: InboundEntry<T> = InboundEntry::create( | ||
submessage.clone(), | ||
session_id, | ||
domain_address.clone(), | ||
expected_proof_count, | ||
); | ||
inbound_entry.validate(&router_ids, &router_id.clone())?; | ||
|
||
inbound_entry.validate(&router_ids, &router_id.clone())?; | ||
Self::upsert_pending_entry(message_hash, &router_id, inbound_entry)?; | ||
Self::upsert_pending_entry(message_hash, &router_id, inbound_entry)?; | ||
|
||
Self::deposit_processing_event( | ||
domain_address.clone(), | ||
submessage, | ||
message_hash, | ||
router_id.clone(), | ||
); | ||
Self::deposit_processing_event( | ||
domain_address.clone(), | ||
message, | ||
message_hash, | ||
router_id.clone(), | ||
); | ||
|
||
Self::execute_if_requirements_are_met( | ||
message_hash, | ||
&router_ids, | ||
session_id, | ||
expected_proof_count, | ||
domain_address.clone(), | ||
)?; | ||
} | ||
Self::execute_if_requirements_are_met( | ||
message_hash, | ||
&router_ids, | ||
session_id, | ||
expected_proof_count, | ||
domain_address.clone(), | ||
counter, | ||
)?; | ||
|
||
Ok(()) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are collecting the keys here, it also made sense to me to sort them, just in case. Please let me know if there are any objections to this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although this new solution is more simple, I think there is a problem here:
It can collect many keys, making the block impossible to build.
I think we need a complex structure here that allow us to store them already organized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate on this please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about limiting the number of keys that we collect via:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you collect, the iterator will make one read per item, and could be a number of items that overpass the limit for the block weight capacity.
The
take(MAX_MESSAGES_PER_BLOCK)
still does not work because could be left a message in the queue that ideally should be processed first.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if exists some order structure available in substrate for this. If not, we should create some complex/annoying structure to organize the way the messages are stored.
But I'm not able to see a super simple way TBH. We can leave that fix for another PR to unlock this if we see it's not easy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe there's a simpler solution that involves using the latest message nonce. I'll try something on a different branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something similar to - #1992