From 2414a5a5316929ee74826adbd4d5dcaecf3841f0 Mon Sep 17 00:00:00 2001 From: lemunozm Date: Wed, 7 Aug 2024 08:07:32 +0200 Subject: [PATCH] add batch logic --- libs/traits/src/liquidity_pools.rs | 34 +- libs/utils/src/lib.rs | 20 ++ .../axelar-gateway-precompile/src/lib.rs | 2 +- pallets/liquidity-pools-gateway/src/lib.rs | 323 ++++++++---------- pallets/liquidity-pools-gateway/src/tests.rs | 160 ++------- .../liquidity-pools-gateway/src/weights.rs | 28 +- pallets/liquidity-pools/src/lib.rs | 4 + pallets/liquidity-pools/src/message.rs | 26 +- 8 files changed, 279 insertions(+), 318 deletions(-) diff --git a/libs/traits/src/liquidity_pools.rs b/libs/traits/src/liquidity_pools.rs index 94f90c55a4..8d08b2d5ac 100644 --- a/libs/traits/src/liquidity_pools.rs +++ b/libs/traits/src/liquidity_pools.rs @@ -21,8 +21,21 @@ use sp_std::vec::Vec; /// An encoding & decoding trait for the purpose of meeting the /// LiquidityPools General Message Passing Format pub trait LPEncoding: Sized { + const MAX_PACKED_MESSAGES: u32; + fn serialize(&self) -> Vec; fn deserialize(input: &[u8]) -> Result; + + /// Compose this message with a new one + fn pack(&self, other: Self) -> Result; + + /// Decompose the message into a list of messages + /// If the message is not decomposable, it returns the own message. + fn unpack(&self) -> Vec; + + /// Creates an empty message. + /// It's the identity message for composing messages + fn empty() -> Self; } #[cfg(any(test, feature = "std"))] @@ -32,9 +45,14 @@ pub mod test_util { use super::*; + pub const DECODING_ERR_MSG: &str = "decoding message error"; + pub const EMPTY_ERR_MSG: &str = "empty message error error"; + #[derive(Default, Debug, Eq, PartialEq, Clone, Encode, Decode, TypeInfo, MaxEncodedLen)] pub struct Message; impl LPEncoding for Message { + const MAX_PACKED_MESSAGES: u32 = 1; + fn serialize(&self) -> Vec { vec![0x42] } @@ -42,10 +60,22 @@ pub mod test_util { fn deserialize(input: &[u8]) -> Result { match input.first() { Some(0x42) => Ok(Self), - Some(_) => Err("unsupported message".into()), - None => Err("empty message".into()), + Some(_) => Err(DECODING_ERR_MSG.into()), + None => Err(EMPTY_ERR_MSG.into()), } } + + fn pack(&self, _: Self) -> Result { + unimplemented!() + } + + fn unpack(&self) -> Vec { + vec![Self] + } + + fn empty() -> Self { + unimplemented!() + } } } diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index a1f8bc7412..189cfe8b7e 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -16,6 +16,26 @@ use parity_scale_codec::Encode; use sp_std::cmp::min; +pub struct BufferReader<'a>(pub &'a [u8]); + +impl<'a> BufferReader<'a> { + pub fn read_bytes(&mut self, bytes: usize) -> Option<&[u8]> { + if self.0.len() < bytes { + return None; + } + + let (consumed, remaining) = self.0.split_at(bytes); + self.0 = remaining; + Some(consumed) + } + + pub fn read_array(&mut self) -> Option<&[u8; N]> { + let (consumed, remaining) = self.0.split_first_chunk::()?; + self.0 = remaining; + Some(consumed) + } +} + /// Build a fixed-size array using as many elements from `src` as possible /// without overflowing and ensuring that the array is 0 padded in the case /// where `src.len()` is smaller than S. diff --git a/pallets/liquidity-pools-gateway/axelar-gateway-precompile/src/lib.rs b/pallets/liquidity-pools-gateway/axelar-gateway-precompile/src/lib.rs index dcd1d3b134..fddc1fec92 100644 --- a/pallets/liquidity-pools-gateway/axelar-gateway-precompile/src/lib.rs +++ b/pallets/liquidity-pools-gateway/axelar-gateway-precompile/src/lib.rs @@ -287,7 +287,7 @@ where exit_status: ExitError::Other("account bytes mismatch for domain".into()), })?; - match pallet_liquidity_pools_gateway::Pallet::::process_msg( + match pallet_liquidity_pools_gateway::Pallet::::receive_message( pallet_liquidity_pools_gateway::GatewayOrigin::Domain(domain_address).into(), msg, ) diff --git a/pallets/liquidity-pools-gateway/src/lib.rs b/pallets/liquidity-pools-gateway/src/lib.rs index d38415409b..8d7a62c677 100644 --- a/pallets/liquidity-pools-gateway/src/lib.rs +++ b/pallets/liquidity-pools-gateway/src/lib.rs @@ -38,7 +38,7 @@ use cfg_traits::{ }; use cfg_types::domain_address::{Domain, DomainAddress}; use frame_support::{dispatch::DispatchResult, pallet_prelude::*, PalletError}; -use frame_system::pallet_prelude::OriginFor; +use frame_system::pallet_prelude::{ensure_signed, OriginFor}; use message::GatewayMessage; use orml_traits::GetByKey; pub use pallet::*; @@ -208,6 +208,13 @@ pub mod pallet { pub type DomainHookAddress = StorageMap<_, Blake2_128Concat, Domain, [u8; 20], OptionQuery>; + /// Stores a packed message, not ready yet to be enqueue. + /// Lifetime handled by `start_pack_messages()` and `end_pack_messages()` + /// extrinsics. + #[pallet::storage] + pub(crate) type PackedMessage = + StorageMap<_, Blake2_128Concat, (T::AccountId, Domain), T::Message>; + #[pallet::error] pub enum Error { /// Router initialization failed. @@ -219,9 +226,6 @@ pub mod pallet { /// The domain is not supported. DomainNotSupported, - /// Message decoding error. - MessageDecodingFailed, - /// Instance was already added to the domain. InstanceAlreadyAdded, @@ -246,6 +250,14 @@ pub mod pallet { /// Decoding that is essential and this error /// signals malforming of the wrapping information. RelayerMessageDecodingFailed { reason: RelayerMessageDecodingError }, + + /// Emitted when you call `start_pack_messages()` but that was already + /// called. You should finalize the message with `end_pack_messages()` + MessagePackingAlreadyStarted, + + /// Emitted when you can `end_pack_messages()` but the packing process + /// was not started by `start_pack_messages()`. + MessagePackingNotStarted, } #[pallet::call] @@ -356,16 +368,14 @@ pub mod pallet { } /// Process an inbound message. - #[pallet::weight(T::WeightInfo::process_msg())] + #[pallet::weight(T::WeightInfo::receive_message())] #[pallet::call_index(5)] - pub fn process_msg( + pub fn receive_message( origin: OriginFor, msg: BoundedVec, ) -> DispatchResult { - let (domain_address, incoming_msg) = match T::LocalEVMOrigin::ensure_origin(origin)? { - GatewayOrigin::Domain(domain_address) => { - Pallet::::validate(domain_address, msg)? - } + let (origin_address, incoming_msg) = match T::LocalEVMOrigin::ensure_origin(origin)? { + GatewayOrigin::Domain(domain_address) => (domain_address, msg), GatewayOrigin::AxelarRelay(domain_address) => { // Every axelar relay address has a separate storage ensure!( @@ -376,88 +386,58 @@ pub mod pallet { // Every axelar relay will prepend the (sourceChain, // sourceAddress) from actual origination chain to the // message bytes, with a length identifier - let slice_ref = &mut msg.as_slice(); - let length_source_chain: usize = Pallet::::try_range( - slice_ref, - BYTES_U32, - Error::::from(MalformedSourceChainLength), - |be_bytes_u32| { - let mut bytes = [0u8; BYTES_U32]; - // NOTE: This can NEVER panic as the `try_range` logic ensures the given - // bytes have the right length. I.e. 4 in this case - bytes.copy_from_slice(be_bytes_u32); - - u32::from_be_bytes(bytes).try_into().map_err(|_| { - DispatchError::Other("Expect: usize in wasm is always ge u32") - }) - }, - )?; - - let source_chain = Pallet::::try_range( - slice_ref, - length_source_chain, - Error::::from(MalformedSourceChain), - |source_chain| Ok(source_chain.to_vec()), - )?; - - let length_source_address: usize = Pallet::::try_range( - slice_ref, - BYTES_U32, - Error::::from(MalformedSourceAddressLength), - |be_bytes_u32| { - let mut bytes = [0u8; BYTES_U32]; - // NOTE: This can NEVER panic as the `try_range` logic ensures the given - // bytes have the right length. I.e. 4 in this case - bytes.copy_from_slice(be_bytes_u32); - - u32::from_be_bytes(bytes).try_into().map_err(|_| { - DispatchError::Other("Expect: usize in wasm is always ge u32") - }) - }, - )?; - - let source_address = Pallet::::try_range( - slice_ref, - length_source_address, - Error::::from(MalformedSourceAddress), - |source_address| { + let mut input = cfg_utils::BufferReader(msg.as_slice()); + + let length_source_chain = match input.read_array::() { + Some(bytes) => u32::from_be_bytes(*bytes), + None => Err(Error::::from(MalformedSourceChainLength))?, + }; + + let source_chain = match input.read_bytes(length_source_chain as usize) { + Some(bytes) => bytes.to_vec(), + None => Err(Error::::from(MalformedSourceChain))?, + }; + + let length_source_address = match input.read_array::() { + Some(bytes) => u32::from_be_bytes(*bytes), + None => Err(Error::::from(MalformedSourceAddressLength))?, + }; + + let source_address = match input.read_bytes(length_source_address as usize) { + Some(bytes) => { // NOTE: Axelar simply provides the hexadecimal string of an EVM - // address as the `sourceAddress` argument. Solidity does on the - // other side recognize the hex-encoding and encode the hex bytes - // to utf-8 bytes. + // address as the `sourceAddress` argument. Solidity does on + // the other side recognize the hex-encoding and + // encode the hex bytes to utf-8 bytes. // // Hence, we are reverting this process here. - let source_address = - cfg_utils::decode_var_source::(source_address) - .ok_or(Error::::from(MalformedSourceAddress))?; - - Ok(source_address.to_vec()) - }, - )?; - - let origin_msg = Pallet::::try_range( - slice_ref, - slice_ref.len(), - Error::::from(MalformedMessage), - |msg| { - BoundedVec::try_from(msg.to_vec()).map_err(|_| { - DispatchError::Other( - "Remaining bytes smaller vector in the first place. qed.", - ) - }) - }, - )?; - - let origin_domain = - T::OriginRecovery::try_convert((source_chain, source_address))?; - - Pallet::::validate(origin_domain, origin_msg)? + cfg_utils::decode_var_source::(bytes) + .ok_or(Error::::from(MalformedSourceAddress))? + .to_vec() + } + None => Err(Error::::from(MalformedSourceAddress))?, + }; + + ( + T::OriginRecovery::try_convert((source_chain, source_address))?, + BoundedVec::try_from(input.0.to_vec()) + .map_err(|_| Error::::from(MalformedMessage))?, + ) } }; + if let DomainAddress::Centrifuge(_) = origin_address { + return Err(Error::::InvalidMessageOrigin.into()); + } + + ensure!( + Allowlist::::contains_key(origin_address.domain(), origin_address.clone()), + Error::::UnknownInstance, + ); + let gateway_message = GatewayMessage::::Inbound { - domain_address, - message: incoming_msg, + domain_address: origin_address, + message: T::Message::deserialize(&incoming_msg)?, }; T::MessageQueue::submit(gateway_message) @@ -485,46 +465,48 @@ pub mod pallet { Ok(()) } - } - impl Pallet { - pub(crate) fn try_range<'a, D, F>( - slice: &mut &'a [u8], - next_steps: usize, - error: Error, - transformer: F, - ) -> Result - where - F: Fn(&'a [u8]) -> Result, - { - ensure!(slice.len() >= next_steps, error); - - let (input, new_slice) = slice.split_at(next_steps); - let res = transformer(input)?; - *slice = new_slice; - - Ok(res) + /// Start packing messages in a single message instead of enqueue + /// messages. + /// The message will be enqueued once `end_pack_messages()` is called. + #[pallet::weight(T::WeightInfo::start_pack_messages())] + #[pallet::call_index(9)] + pub fn start_pack_messages(origin: OriginFor, destination: Domain) -> DispatchResult { + let sender = ensure_signed(origin)?; + + PackedMessage::::mutate((&sender, &destination), |msg| match msg { + Some(_) => Err(Error::::MessagePackingAlreadyStarted.into()), + None => { + *msg = Some(T::Message::empty()); + Ok(()) + } + }) } - fn validate( - address: DomainAddress, - msg: BoundedVec, - ) -> Result<(DomainAddress, T::Message), DispatchError> { - if let DomainAddress::Centrifuge(_) = address { - return Err(Error::::InvalidMessageOrigin.into()); + /// End packing messages. + /// If exists any packed message it will be enqueued. + #[pallet::weight(T::WeightInfo::end_pack_messages())] + #[pallet::call_index(10)] + pub fn end_pack_messages(origin: OriginFor, destination: Domain) -> DispatchResult { + let sender = ensure_signed(origin)?; + + match PackedMessage::::take((&sender, &destination)) { + Some(msg) if msg.unpack().is_empty() => Ok(()), //No-op + Some(message) => { + let gateway_message = GatewayMessage::::Outbound { + sender: T::Sender::get(), + destination, + message, + }; + + T::MessageQueue::submit(gateway_message) + } + None => Err(Error::::MessagePackingNotStarted.into()), } - - ensure!( - Allowlist::::contains_key(address.domain(), address.clone()), - Error::::UnknownInstance, - ); - - let incoming_msg = T::Message::deserialize(msg.as_slice()) - .map_err(|_| Error::::MessageDecodingFailed)?; - - Ok((address, incoming_msg)) } + } + impl Pallet { /// Sends the message to the `InboundMessageHandler`. fn process_inbound_message( domain_address: DomainAddress, @@ -533,10 +515,17 @@ pub mod pallet { let weight = Weight::from_parts(0, T::Message::max_encoded_len() as u64) .saturating_add(LP_DEFENSIVE_WEIGHT); - match T::InboundMessageHandler::handle(domain_address, message) { - Ok(_) => (Ok(()), weight), - Err(e) => (Err(e), weight), + let mut count = 0; + for submessage in message.unpack() { + count += 1; + + if let Err(e) = T::InboundMessageHandler::handle(domain_address.clone(), submessage) + { + return (Err(e), weight.saturating_mul(count)); + } } + + (Ok(()), weight.saturating_mul(count)) } /// Retrieves the router stored for the provided domain, sends the @@ -547,55 +536,29 @@ pub mod pallet { domain: Domain, message: T::Message, ) -> (DispatchResult, Weight) { - let mut weight = T::DbWeight::get().reads(1); + let read_weight = T::DbWeight::get().reads(1); - let router = match DomainRouters::::get(domain) { - Some(r) => r, - None => return (Err(Error::::RouterNotFound.into()), weight), + let Some(router) = DomainRouters::::get(domain) else { + return (Err(Error::::RouterNotFound.into()), read_weight); }; - match router.send(sender.clone(), message.serialize()) { - Ok(dispatch_info) => Self::update_total_post_dispatch_info_weight( - &mut weight, - dispatch_info.actual_weight, - ), - Err(e) => { - Self::update_total_post_dispatch_info_weight( - &mut weight, - e.post_info.actual_weight, - ); - - return (Err(e.error), weight); - } - } + let serialized = message.serialize(); + let serialized_len = serialized.len() as u64; - (Ok(()), weight) - } - - /// Updates the provided `PostDispatchInfo` with the weight required to - /// process an outbound message. - pub(crate) fn update_total_post_dispatch_info_weight( - weight: &mut Weight, - router_call_weight: Option, - ) { - let router_call_weight = - Self::get_outbound_message_processing_weight(router_call_weight); - - *weight = weight.saturating_add(router_call_weight); - } + // TODO: do we really need to return the weights in send() if later we use the + // defensive ones? + let (result, router_weight) = match router.send(sender, serialized) { + Ok(dispatch_info) => (Ok(()), dispatch_info.actual_weight), + Err(e) => (Err(e.error), e.post_info.actual_weight), + }; - /// Calculates the weight used by a router when processing an outbound - /// message. - fn get_outbound_message_processing_weight(router_call_weight: Option) -> Weight { - let pov_weight: u64 = (Domain::max_encoded_len() - + T::AccountId::max_encoded_len() - + T::Message::max_encoded_len()) - .try_into() - .expect("can calculate outbound message POV weight"); - - router_call_weight - .unwrap_or(Weight::from_parts(LP_DEFENSIVE_WEIGHT_REF_TIME, 0)) - .saturating_add(Weight::from_parts(0, pov_weight)) + ( + result, + router_weight + .unwrap_or(Weight::from_parts(LP_DEFENSIVE_WEIGHT_REF_TIME, 0)) + .saturating_add(read_weight) + .saturating_add(Weight::from_parts(0, serialized_len)), + ) } } @@ -611,7 +574,7 @@ pub mod pallet { type Sender = T::AccountId; fn handle( - _sender: Self::Sender, + from: Self::Sender, destination: Self::Destination, message: Self::Message, ) -> DispatchResult { @@ -620,13 +583,27 @@ pub mod pallet { Error::::DomainNotSupported ); - let gateway_message = GatewayMessage::::Outbound { - sender: T::Sender::get(), - destination, - message, - }; + ensure!( + DomainRouters::::contains_key(destination.clone()), + Error::::RouterNotFound + ); - T::MessageQueue::submit(gateway_message) + match PackedMessage::::get((&from, &destination)) { + Some(packed) => { + PackedMessage::::insert((from, destination), packed.pack(message)?); + Ok(()) + } + None => { + // Ok, we use the gateway sender. + let gateway_message = GatewayMessage::::Outbound { + sender: T::Sender::get(), + destination, + message, + }; + + T::MessageQueue::submit(gateway_message) + } + } } } diff --git a/pallets/liquidity-pools-gateway/src/tests.rs b/pallets/liquidity-pools-gateway/src/tests.rs index e0c1428548..3a14baa482 100644 --- a/pallets/liquidity-pools-gateway/src/tests.rs +++ b/pallets/liquidity-pools-gateway/src/tests.rs @@ -1,7 +1,8 @@ use cfg_mocks::*; use cfg_primitives::LP_DEFENSIVE_WEIGHT; use cfg_traits::liquidity_pools::{ - test_util::Message, LPEncoding, MessageProcessor, OutboundMessageHandler, + test_util::{Message, DECODING_ERR_MSG}, + LPEncoding, MessageProcessor, OutboundMessageHandler, }; use cfg_types::domain_address::*; use frame_support::{ @@ -39,97 +40,6 @@ mod utils { use utils::*; -mod pallet_internals { - - use super::*; - - #[test] - fn try_range_fails_if_slice_to_short() { - new_test_ext().execute_with(|| { - let three_bytes = [0u8; 3]; - let steps = 4usize; - - assert_noop!( - Pallet::::try_range( - &mut three_bytes.as_slice(), - steps, - Error::::MessageDecodingFailed, - |_| Ok(()) - ), - Error::::MessageDecodingFailed - ); - }) - } - - #[test] - fn try_range_updates_slice_ref_correctly() { - new_test_ext().execute_with(|| { - let bytes = [1, 2, 3, 4, 5, 6, 7u8]; - let slice = &mut bytes.as_slice(); - let steps = 4; - let first_section = Pallet::::try_range( - slice, - steps, - Error::::MessageDecodingFailed, - |first_section| Ok(first_section), - ) - .expect("Slice is long enough"); - - assert_eq!(first_section, &[1, 2, 3, 4]); - - let steps = 2; - let second_section = Pallet::::try_range( - slice, - steps, - Error::::MessageDecodingFailed, - |second_section| Ok(second_section), - ) - .expect("Slice is long enough"); - - assert_eq!(&second_section, &[5, 6]); - - let steps = 1; - let third_section = Pallet::::try_range( - slice, - steps, - Error::::MessageDecodingFailed, - |third_section| Ok(third_section), - ) - .expect("Slice is long enough"); - - assert_eq!(&third_section, &[7]); - }) - } - - #[test] - fn try_range_does_not_update_slice_if_transformer_errors() { - new_test_ext().execute_with(|| { - let bytes = [1, 2, 3, 4, 5, 6, 7u8]; - let slice = &mut bytes.as_slice(); - let steps = 4; - let first_section = Pallet::::try_range( - slice, - steps, - Error::::MessageDecodingFailed, - |first_section| Ok(first_section), - ) - .expect("Slice is long enough"); - - assert_eq!(first_section, &[1, 2, 3, 4]); - - let steps = 1; - assert!(Pallet::::try_range( - slice, - steps, - Error::::MessageDecodingFailed, - |_| Err::<(), _>(DispatchError::Corruption) - ) - .is_err()); - assert_eq!(slice, &[5, 6, 7]); - }) - } -} - mod set_domain_router { use super::*; @@ -429,7 +339,7 @@ mod remove_instance { } } -mod process_msg_axelar_relay { +mod receive_message_axelar_relay { use sp_core::bounded::BoundedVec; use super::*; @@ -478,7 +388,7 @@ mod process_msg_axelar_relay { let solidity_header = "0000000a657468657265756d2d320000002a307838353033623434353242663632333863433736436462454532323362343664373139366231633933"; let payload = [hex::decode(solidity_header).unwrap(), Message.serialize()].concat(); - assert_ok!(LiquidityPoolsGateway::process_msg( + assert_ok!(LiquidityPoolsGateway::receive_message( GatewayOrigin::AxelarRelay(relayer_address).into(), BoundedVec::::try_from(payload).unwrap() )); @@ -530,7 +440,7 @@ mod process_msg_axelar_relay { msg.extend_from_slice(&SOURCE_ADDRESS); msg.extend_from_slice(&message.serialize()); - assert_ok!(LiquidityPoolsGateway::process_msg( + assert_ok!(LiquidityPoolsGateway::receive_message( GatewayOrigin::AxelarRelay(relayer_address).into(), BoundedVec::::try_from(msg).unwrap() )); @@ -583,7 +493,7 @@ mod process_msg_axelar_relay { msg.extend_from_slice(&SOURCE_ADDRESS); msg.extend_from_slice(&message.serialize()); - assert_ok!(LiquidityPoolsGateway::process_msg( + assert_ok!(LiquidityPoolsGateway::receive_message( GatewayOrigin::AxelarRelay(relayer_address).into(), BoundedVec::::try_from(msg).unwrap() )); @@ -621,7 +531,7 @@ mod process_msg_axelar_relay { }); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::AxelarRelay(relayer_address).into(), BoundedVec::::try_from(msg).unwrap() ), @@ -665,7 +575,7 @@ mod process_msg_axelar_relay { }); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::AxelarRelay(relayer_address).into(), BoundedVec::::try_from(msg).unwrap() ), @@ -704,11 +614,11 @@ mod process_msg_axelar_relay { }); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::Domain(domain_address).into(), BoundedVec::::try_from(encoded_msg).unwrap() ), - Error::::MessageDecodingFailed, + DispatchError::Other(DECODING_ERR_MSG), ); }); } @@ -742,7 +652,7 @@ mod process_msg_axelar_relay { MockLiquidityPoolsGatewayQueue::mock_submit(move |_| Err(err)); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::Domain(domain_address).into(), BoundedVec::::try_from(encoded_msg).unwrap() ), @@ -752,7 +662,7 @@ mod process_msg_axelar_relay { } } -mod process_msg_domain { +mod receive_message_domain { use super::*; #[test] @@ -779,7 +689,7 @@ mod process_msg_domain { Ok(()) }); - assert_ok!(LiquidityPoolsGateway::process_msg( + assert_ok!(LiquidityPoolsGateway::receive_message( GatewayOrigin::Domain(domain_address).into(), BoundedVec::::try_from(encoded_msg).unwrap() )); @@ -792,7 +702,7 @@ mod process_msg_domain { let encoded_msg = Message.serialize(); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( RuntimeOrigin::root(), BoundedVec::::try_from(encoded_msg).unwrap() ), @@ -808,7 +718,7 @@ mod process_msg_domain { let encoded_msg = Message.serialize(); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::Domain(domain_address).into(), BoundedVec::::try_from(encoded_msg).unwrap() ), @@ -825,7 +735,7 @@ mod process_msg_domain { let encoded_msg = Message.serialize(); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::Domain(domain_address).into(), BoundedVec::::try_from(encoded_msg).unwrap() ), @@ -848,11 +758,11 @@ mod process_msg_domain { let encoded_msg: Vec = vec![11]; assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::Domain(domain_address).into(), BoundedVec::::try_from(encoded_msg).unwrap() ), - Error::::MessageDecodingFailed, + DispatchError::Other(DECODING_ERR_MSG), ); }); } @@ -884,7 +794,7 @@ mod process_msg_domain { }); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::Domain(domain_address).into(), BoundedVec::::try_from(encoded_msg).unwrap() ), @@ -1125,23 +1035,19 @@ mod message_processor_impl { DomainRouters::::insert(domain.clone(), router_mock); - let mut expected_weight = - ::DbWeight::get().reads(1); - - Pallet::::update_total_post_dispatch_info_weight( - &mut expected_weight, - router_post_info.actual_weight, - ); + let min_expected_weight = ::DbWeight::get() + .reads(1) + router_post_info.actual_weight.unwrap() + + Weight::from_parts(0, message.serialize().len() as u64); let gateway_message = GatewayMessage::::Outbound { sender, destination: domain, - message, + message: message.clone(), }; let (res, weight) = LiquidityPoolsGateway::process(gateway_message); assert_ok!(res); - assert_eq!(weight, expected_weight); + assert!(weight.all_lte(min_expected_weight)); }); } @@ -1188,31 +1094,27 @@ mod message_processor_impl { assert_eq!(mock_sender, expected_sender); assert_eq!(mock_message, expected_message.serialize()); - Err(DispatchErrorWithPostInfo { + Err(dbg!(DispatchErrorWithPostInfo { post_info: router_post_info, error: router_err, - }) + })) }); DomainRouters::::insert(domain.clone(), router_mock); - let mut expected_weight = - ::DbWeight::get().reads(1); - - Pallet::::update_total_post_dispatch_info_weight( - &mut expected_weight, - router_post_info.actual_weight, - ); + let min_expected_weight = ::DbWeight::get() + .reads(1) + router_post_info.actual_weight.unwrap() + + Weight::from_parts(0, message.serialize().len() as u64); let gateway_message = GatewayMessage::::Outbound { sender, destination: domain, - message, + message: message.clone(), }; let (res, weight) = LiquidityPoolsGateway::process(gateway_message); assert_noop!(res, router_err); - assert_eq!(weight, expected_weight); + assert!(weight.all_lte(min_expected_weight)); }); } } diff --git a/pallets/liquidity-pools-gateway/src/weights.rs b/pallets/liquidity-pools-gateway/src/weights.rs index d72e6e0f91..32109817e5 100644 --- a/pallets/liquidity-pools-gateway/src/weights.rs +++ b/pallets/liquidity-pools-gateway/src/weights.rs @@ -18,9 +18,11 @@ pub trait WeightInfo { fn remove_instance() -> Weight; fn add_relayer() -> Weight; fn remove_relayer() -> Weight; - fn process_msg() -> Weight; + fn receive_message() -> Weight; fn process_outbound_message() -> Weight; fn process_failed_outbound_message() -> Weight; + fn start_pack_messages() -> Weight; + fn end_pack_messages() -> Weight; } // NOTE: We use temporary weights here. `execute_epoch` is by far our heaviest @@ -84,7 +86,7 @@ impl WeightInfo for () { .saturating_add(RocksDbWeight::get().writes(1)) } - fn process_msg() -> Weight { + fn receive_message() -> Weight { // NOTE: Defensive hardcoded weight taken from pool_system::execute_epoch. Will // be replaced with real benchmark soon. // @@ -122,4 +124,26 @@ impl WeightInfo for () { .saturating_add(RocksDbWeight::get().reads(2)) .saturating_add(RocksDbWeight::get().writes(1)) } + + fn start_pack_messages() -> Weight { + // TODO: BENCHMARK CORRECTLY + // + // NOTE: Reasonable weight taken from `PoolSystem::set_max_reserve` + // This one has one read and one write for sure and possible one + // read for `AdminOrigin` + Weight::from_parts(30_117_000, 5991) + .saturating_add(RocksDbWeight::get().reads(1)) + .saturating_add(RocksDbWeight::get().writes(1)) + } + + fn end_pack_messages() -> Weight { + // TODO: BENCHMARK CORRECTLY + // + // NOTE: Reasonable weight taken from `PoolSystem::set_max_reserve` + // This one has one read and one write for sure and possible one + // read for `AdminOrigin` + Weight::from_parts(30_117_000, 5991) + .saturating_add(RocksDbWeight::get().reads(2)) + .saturating_add(RocksDbWeight::get().writes(2)) + } } diff --git a/pallets/liquidity-pools/src/lib.rs b/pallets/liquidity-pools/src/lib.rs index 8e55c470c3..3f047f923a 100644 --- a/pallets/liquidity-pools/src/lib.rs +++ b/pallets/liquidity-pools/src/lib.rs @@ -339,6 +339,9 @@ pub mod pallet { NotPoolAdmin, /// The domain hook address could not be found. DomainHookAddressNotFound, + /// This pallet does not expect to receive direclty a batch message, + /// instead it expects several calls to it with different messages. + UnsupportedBatchMessage, } #[pallet::call] @@ -1029,6 +1032,7 @@ pub mod pallet { currency.into(), sender, ), + Message::Batch(_) => Err(Error::::UnsupportedBatchMessage.into()), _ => Err(Error::::InvalidIncomingMessage.into()), }?; diff --git a/pallets/liquidity-pools/src/message.rs b/pallets/liquidity-pools/src/message.rs index fc917eee29..a571469599 100644 --- a/pallets/liquidity-pools/src/message.rs +++ b/pallets/liquidity-pools/src/message.rs @@ -529,9 +529,19 @@ pub enum Message { }, } -impl Message { +impl LPEncoding for Message { + const MAX_PACKED_MESSAGES: u32 = MAX_BATCH_MESSAGES; + + fn serialize(&self) -> Vec { + gmpf::to_vec(self).unwrap_or_default() + } + + fn deserialize(data: &[u8]) -> Result { + gmpf::from_slice(data).map_err(|_| DispatchError::Other("LP Deserialization issue")) + } + /// Compose this message with a new one - pub fn pack(&self, other: Self) -> Result { + fn pack(&self, other: Self) -> Result { Ok(match self.clone() { Message::Batch(content) => { let mut content = content.clone(); @@ -543,21 +553,15 @@ impl Message { } /// Decompose the message into a list of messages - pub fn unpack(&self) -> Vec { + fn unpack(&self) -> Vec { match self { Message::Batch(content) => content.clone().into_iter().collect(), message => vec![message.clone()], } } -} -impl LPEncoding for Message { - fn serialize(&self) -> Vec { - gmpf::to_vec(self).unwrap_or_default() - } - - fn deserialize(data: &[u8]) -> Result { - gmpf::from_slice(data).map_err(|_| DispatchError::Other("LP Deserialization issue")) + fn empty() -> Message { + Message::Batch(BatchMessages::default()) } }