Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/gateway message processing #1991

Merged
merged 14 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions pallets/liquidity-pools-gateway-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,13 @@ pub mod pallet {
fn service_message_queue(max_weight: Weight) -> Weight {
let mut weight_used = Weight::zero();

let mut processed_entries = Vec::new();
let mut nonces = MessageQueue::<T>::iter_keys().collect::<Vec<_>>();
nonces.sort();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are collecting the keys here, it also made sense to me to sort them, just in case. Please let me know if there are any objections to this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this new solution is more simple, I think there is a problem here:

MessageQueue::<T>::iter_keys().collect::<Vec<_>>();

It can collect many keys, making the block impossible to build.

I think we need a complex structure here that allow us to store them already organized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can collect many keys, making the block impossible to build.

Can you elaborate on this please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about limiting the number of keys that we collect via:

let mut nonces = MessageQueue::<T>::iter_keys().take(MAX_MESSAGES_PER_BLOCK).collect::<Vec<_>>();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this please?

When you collect, the iterator will make one read per item, and could be a number of items that overpass the limit for the block weight capacity.

The take(MAX_MESSAGES_PER_BLOCK) still does not work because could be left a message in the queue that ideally should be processed first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if exists some order structure available in substrate for this. If not, we should create some complex/annoying structure to organize the way the messages are stored.

But I'm not able to see a super simple way TBH. We can leave that fix for another PR to unlock this if we see it's not easy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there's a simpler solution that involves using the latest message nonce. I'll try something on a different branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something similar to - #1992


for nonce in nonces {
let message =
MessageQueue::<T>::take(nonce).expect("valid nonce ensured by `iter_keys`");

for (nonce, message) in MessageQueue::<T>::iter() {
let remaining_weight = max_weight.saturating_sub(weight_used);
let next_weight = T::MessageProcessor::max_processing_weight(&message);

Expand Down Expand Up @@ -233,17 +237,7 @@ pub mod pallet {
}
};

processed_entries.push(nonce);

weight_used = weight_used.saturating_add(weight);

if weight_used.all_gte(max_weight) {
cdamian marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}

for entry in processed_entries {
MessageQueue::<T>::remove(entry);
}

weight_used
Expand Down
79 changes: 53 additions & 26 deletions pallets/liquidity-pools-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ use cfg_traits::liquidity_pools::{
OutboundMessageHandler, RouterProvider,
};
use cfg_types::domain_address::{Domain, DomainAddress};
use frame_support::{dispatch::DispatchResult, pallet_prelude::*};
use frame_support::{
dispatch::{DispatchResult, PostDispatchInfo},
pallet_prelude::*,
storage::{with_transaction, TransactionOutcome},
};
use frame_system::pallet_prelude::{ensure_signed, OriginFor};
use message::GatewayMessage;
use orml_traits::GetByKey;
Expand Down Expand Up @@ -460,7 +464,7 @@ pub mod pallet {
domain_address: DomainAddress,
message_hash: MessageHash,
router_id: T::RouterId,
) -> DispatchResult {
) -> DispatchResultWithPostInfo {
cdamian marked this conversation as resolved.
Show resolved Hide resolved
T::AdminOrigin::ensure_origin(origin)?;

let router_ids = Self::get_router_ids_for_domain(domain_address.domain())?;
Expand All @@ -469,8 +473,8 @@ pub mod pallet {
router_ids.iter().any(|x| x == &router_id),
Error::<T>::UnknownRouter
);
// Message recovery shouldn't be supported for setups that have less than 1
// router since no proofs are required in that case.
// Message recovery shouldn't be supported for setups that have less than 2
// routers since no proofs are required in that case.
ensure!(router_ids.len() > 1, Error::<T>::NotEnoughRoutersForDomain);

let session_id = SessionIdStore::<T>::get();
Expand All @@ -495,20 +499,26 @@ pub mod pallet {

let expected_proof_count = Self::get_expected_proof_count(&router_ids)?;

let mut counter = 0u64;

Self::execute_if_requirements_are_met(
message_hash,
&router_ids,
session_id,
expected_proof_count,
domain_address,
&mut counter,
)?;

Self::deposit_event(Event::<T>::MessageRecoveryExecuted {
message_hash,
router_id,
});

Ok(())
Ok(PostDispatchInfo {
actual_weight: Some(Self::get_weight_for_batch_messages(counter)),
pays_fee: Pays::Yes,
})
}

/// Sends a message that initiates a message recovery using the
Expand Down Expand Up @@ -587,6 +597,13 @@ pub mod pallet {

T::MessageSender::send(messaging_router, T::Sender::get(), message)
}

fn get_weight_for_batch_messages(count: u64) -> Weight {
match count {
0 => LP_DEFENSIVE_WEIGHT / 10,
n => LP_DEFENSIVE_WEIGHT.saturating_mul(n),
}
}
}

impl<T: Config> OutboundMessageHandler for Pallet<T> {
Expand Down Expand Up @@ -621,34 +638,44 @@ pub mod pallet {
type Message = GatewayMessage<T::Message, T::RouterId>;

fn process(msg: Self::Message) -> (DispatchResult, Weight) {
match msg {
GatewayMessage::Inbound {
domain_address,
message,
router_id,
} => {
let mut counter = 0;

let res = Self::process_inbound_message(
// The #[transactional] macro only works for functions that return a
// `DispatchResult` therefore, we need to manually add this here.
Comment on lines +627 to +628
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Any reason for going with the overhead of with_transaction and branching over the result instead of just using #[transactional] and returning Ok(())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean here - do you mean changing the return of process to DispatchResult?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use the #[transactional] macro in this function, that is not supported. We can either change the signature as I suggested in my previous message, or go with the current approach.

with_transaction(|| {
lemunozm marked this conversation as resolved.
Show resolved Hide resolved
let (res, weight) = match msg {
GatewayMessage::Inbound {
domain_address,
message,
router_id,
&mut counter,
);
} => {
let mut counter = 0;

let weight = match counter {
0 => LP_DEFENSIVE_WEIGHT / 10,
n => LP_DEFENSIVE_WEIGHT.saturating_mul(n),
};
let res = Self::process_inbound_message(
domain_address,
message,
router_id,
&mut counter,
);

(res, weight)
}
GatewayMessage::Outbound { message, router_id } => {
let res = T::MessageSender::send(router_id, T::Sender::get(), message);
(res, Self::get_weight_for_batch_messages(counter))
lemunozm marked this conversation as resolved.
Show resolved Hide resolved
}
GatewayMessage::Outbound { message, router_id } => {
let res = T::MessageSender::send(router_id, T::Sender::get(), message);

(res, LP_DEFENSIVE_WEIGHT)
(res, LP_DEFENSIVE_WEIGHT)
}
};

if res.is_ok() {
TransactionOutcome::Commit(Ok::<(DispatchResult, Weight), DispatchError>((
res, weight,
)))
} else {
TransactionOutcome::Rollback(Ok::<(DispatchResult, Weight), DispatchError>((
res, weight,
)))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super NIT. I think adding the returned type in the closure allows to remove the Ok<stuff> here

with_transaction(|| -> DispatchResult {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That won't work given:

pub fn with_transaction<T, E, F>(f: F) -> Result<T, E>
where
	E: From<DispatchError>,
	F: FnOnce() -> TransactionOutcome<Result<T, E>>, // this
{

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But given that we will always return LP_DEFENSIVE_WEIGHT for both inbound/outbound, we can change this a bit. I'll ping you when done.

}
})
.expect("success is ensured by the transaction closure")
}

/// Returns the max processing weight for a message, based on its
Expand Down
65 changes: 35 additions & 30 deletions pallets/liquidity-pools-gateway/src/message_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ impl<T: Config> Pallet<T> {
session_id: T::SessionId,
expected_proof_count: u32,
domain_address: DomainAddress,
counter: &mut u64,
) -> DispatchResult {
let mut message = None;
let mut votes = 0;
Expand All @@ -333,7 +334,11 @@ impl<T: Config> Pallet<T> {
// we can return.
None => return Ok(()),
Some(stored_inbound_entry) => match stored_inbound_entry {
InboundEntry::Message(message_entry) => message = Some(message_entry.message),
InboundEntry::Message(message_entry)
if message_entry.session_id == session_id =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: If session_id is different, should we remove the entry?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: If session_id is different, should we remove the entry?

We should not because then it's impossible to replay the message and funds are stuck on the EVM side. Keeping entries for an old session id can be made unstuck via execute_message_recovery.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute_message_recovery will only increase the proof count for a specific router ID, we will still hit this logic and be unable to execute a message from an older session. Maybe we should extend execute_message_recovery and either:

  • set the session of a message entry to the current one;
  • increase the proof count - current behavior;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumping this again since I think we might need to add the 1st point that I mentioned above.

cc @hieronx

{
message = Some(message_entry.message)
}
InboundEntry::Proof(proof_entry)
if proof_entry.has_valid_vote_for_session(session_id) =>
{
Expand All @@ -349,9 +354,13 @@ impl<T: Config> Pallet<T> {
}

if let Some(msg) = message {
Self::execute_post_voting_dispatch(message_hash, router_ids, expected_proof_count)?;
for submessage in msg.submessages() {
cdamian marked this conversation as resolved.
Show resolved Hide resolved
counter.ensure_add_assign(1)?;

T::InboundMessageHandler::handle(domain_address.clone(), msg)?;
T::InboundMessageHandler::handle(domain_address.clone(), submessage)?;
}

Self::execute_post_voting_dispatch(message_hash, router_ids, expected_proof_count)?;

Self::deposit_event(Event::<T>::InboundMessageExecuted {
domain_address,
Expand Down Expand Up @@ -406,37 +415,33 @@ impl<T: Config> Pallet<T> {
let router_ids = Self::get_router_ids_for_domain(domain_address.domain())?;
let session_id = SessionIdStore::<T>::get();
let expected_proof_count = Self::get_expected_proof_count(&router_ids)?;
let message_hash = message.get_message_hash();
let inbound_entry: InboundEntry<T> = InboundEntry::create(
message.clone(),
session_id,
domain_address.clone(),
expected_proof_count,
);

for submessage in message.submessages() {
counter.ensure_add_assign(1)?;

let message_hash = submessage.get_message_hash();

let inbound_entry: InboundEntry<T> = InboundEntry::create(
submessage.clone(),
session_id,
domain_address.clone(),
expected_proof_count,
);
inbound_entry.validate(&router_ids, &router_id.clone())?;

inbound_entry.validate(&router_ids, &router_id.clone())?;
Self::upsert_pending_entry(message_hash, &router_id, inbound_entry)?;
Self::upsert_pending_entry(message_hash, &router_id, inbound_entry)?;

Self::deposit_processing_event(
domain_address.clone(),
submessage,
message_hash,
router_id.clone(),
);
Self::deposit_processing_event(
domain_address.clone(),
message,
message_hash,
router_id.clone(),
);

Self::execute_if_requirements_are_met(
message_hash,
&router_ids,
session_id,
expected_proof_count,
domain_address.clone(),
)?;
}
Self::execute_if_requirements_are_met(
message_hash,
&router_ids,
session_id,
expected_proof_count,
domain_address.clone(),
counter,
)?;

Ok(())
}
Expand Down
Loading
Loading