Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: MSP move bucket #276

Merged
merged 20 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions client/blockchain-service/src/commands.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use anyhow::Result;
use async_trait::async_trait;
use log::{debug, warn};
use sc_network::Multiaddr;
use serde_json::Number;
use sp_api::ApiError;
use sp_core::H256;

use pallet_file_system_runtime_api::{
QueryBspConfirmChunksToProveForFileError, QueryFileEarliestVolunteerTickError,
Expand All @@ -17,11 +20,9 @@ use pallet_storage_providers_runtime_api::{
};
use shc_actors_framework::actor::ActorHandle;
use shc_common::types::{
BlockNumber, BucketId, ChunkId, ForestLeaf, MainStorageProviderId, Multiaddresses, ProviderId,
BlockNumber, BucketId, ChunkId, ForestLeaf, MainStorageProviderId, ProviderId,
RandomnessOutput, StorageHubEventsVec, StorageProviderId, TickNumber, TrieRemoveMutation,
};
use sp_api::ApiError;
use sp_core::H256;
use storage_hub_runtime::{AccountId, Balance, StorageDataUnit};

use super::{
Expand Down Expand Up @@ -92,7 +93,7 @@ pub enum BlockchainServiceCommand {
QueryProviderMultiaddresses {
provider_id: ProviderId,
callback:
tokio::sync::oneshot::Sender<Result<Multiaddresses, QueryProviderMultiaddressesError>>,
tokio::sync::oneshot::Sender<Result<Vec<Multiaddr>, QueryProviderMultiaddressesError>>,
},
QueueSubmitProofRequest {
request: SubmitProofRequest,
Expand Down Expand Up @@ -248,8 +249,8 @@ pub trait BlockchainServiceInterface {
/// Query the MSP multiaddresses.
async fn query_provider_multiaddresses(
&self,
msp_id: ProviderId,
) -> Result<Multiaddresses, QueryProviderMultiaddressesError>;
provider_id: ProviderId,
) -> Result<Vec<Multiaddr>, QueryProviderMultiaddressesError>;

/// Queue a SubmitProofRequest to be processed.
async fn queue_submit_proof_request(&self, request: SubmitProofRequest) -> Result<()>;
Expand Down Expand Up @@ -517,7 +518,7 @@ impl BlockchainServiceInterface for ActorHandle<BlockchainService> {
async fn query_provider_multiaddresses(
&self,
provider_id: ProviderId,
) -> Result<Multiaddresses, QueryProviderMultiaddressesError> {
) -> Result<Vec<Multiaddr>, QueryProviderMultiaddressesError> {
let (callback, rx) = tokio::sync::oneshot::channel();
let message = BlockchainServiceCommand::QueryProviderMultiaddresses {
provider_id,
Expand Down
73 changes: 73 additions & 0 deletions client/blockchain-service/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,39 @@ pub struct SpStopStoringInsolventUser {
}
impl EventBusMessage for SpStopStoringInsolventUser {}

#[derive(Debug, Clone)]
pub struct MoveBucketRequested {
pub bucket_id: BucketId,
pub new_msp_id: ProviderId,
}
impl EventBusMessage for MoveBucketRequested {}

#[derive(Debug, Clone)]
pub struct MoveBucketRequestedForNewMsp {
pub bucket_id: BucketId,
}
impl EventBusMessage for MoveBucketRequestedForNewMsp {}

#[derive(Debug, Clone)]
pub struct MoveBucketRejected {
pub bucket_id: BucketId,
pub msp_id: ProviderId,
}
impl EventBusMessage for MoveBucketRejected {}

#[derive(Debug, Clone)]
pub struct MoveBucketAccepted {
pub bucket_id: BucketId,
pub msp_id: ProviderId,
}
impl EventBusMessage for MoveBucketAccepted {}

#[derive(Debug, Clone)]
pub struct MoveBucketExpired {
pub bucket_id: BucketId,
pub msp_id: ProviderId,
}
impl EventBusMessage for MoveBucketExpired {}
/// BSP stopped storing a specific file.
///
/// This event is emitted when a BSP confirm stop storing a file.
Expand Down Expand Up @@ -300,6 +333,11 @@ pub struct BlockchainServiceEventBusProvider {
user_without_funds_event_bus: EventBus<UserWithoutFunds>,
sp_stop_storing_insolvent_user_event_bus: EventBus<SpStopStoringInsolventUser>,
finalised_msp_stopped_storing_bucket_event_bus: EventBus<FinalisedMspStoppedStoringBucket>,
move_bucket_requested_event_bus: EventBus<MoveBucketRequested>,
move_bucket_rejected_event_bus: EventBus<MoveBucketRejected>,
move_bucket_accepted_event_bus: EventBus<MoveBucketAccepted>,
move_bucket_expired_event_bus: EventBus<MoveBucketExpired>,
move_bucket_requested_for_new_msp_event_bus: EventBus<MoveBucketRequestedForNewMsp>,
bsp_stop_storing_event_bus: EventBus<BspConfirmStoppedStoring>,
finalised_bsp_stop_storing_event_bus: EventBus<FinalisedBspConfirmStoppedStoring>,
notify_period_event_bus: EventBus<NotifyPeriod>,
Expand All @@ -323,6 +361,11 @@ impl BlockchainServiceEventBusProvider {
user_without_funds_event_bus: EventBus::new(),
sp_stop_storing_insolvent_user_event_bus: EventBus::new(),
finalised_msp_stopped_storing_bucket_event_bus: EventBus::new(),
move_bucket_requested_event_bus: EventBus::new(),
move_bucket_rejected_event_bus: EventBus::new(),
move_bucket_accepted_event_bus: EventBus::new(),
move_bucket_expired_event_bus: EventBus::new(),
move_bucket_requested_for_new_msp_event_bus: EventBus::new(),
bsp_stop_storing_event_bus: EventBus::new(),
finalised_bsp_stop_storing_event_bus: EventBus::new(),
notify_period_event_bus: EventBus::new(),
Expand Down Expand Up @@ -422,6 +465,36 @@ impl ProvidesEventBus<FinalisedMspStoppedStoringBucket> for BlockchainServiceEve
}
}

impl ProvidesEventBus<MoveBucketRequested> for BlockchainServiceEventBusProvider {
fn event_bus(&self) -> &EventBus<MoveBucketRequested> {
&self.move_bucket_requested_event_bus
}
}

impl ProvidesEventBus<MoveBucketRejected> for BlockchainServiceEventBusProvider {
fn event_bus(&self) -> &EventBus<MoveBucketRejected> {
&self.move_bucket_rejected_event_bus
}
}

impl ProvidesEventBus<MoveBucketAccepted> for BlockchainServiceEventBusProvider {
fn event_bus(&self) -> &EventBus<MoveBucketAccepted> {
&self.move_bucket_accepted_event_bus
}
}

impl ProvidesEventBus<MoveBucketExpired> for BlockchainServiceEventBusProvider {
fn event_bus(&self) -> &EventBus<MoveBucketExpired> {
&self.move_bucket_expired_event_bus
}
}

impl ProvidesEventBus<MoveBucketRequestedForNewMsp> for BlockchainServiceEventBusProvider {
fn event_bus(&self) -> &EventBus<MoveBucketRequestedForNewMsp> {
&self.move_bucket_requested_for_new_msp_event_bus
}
}

impl ProvidesEventBus<BspConfirmStoppedStoring> for BlockchainServiceEventBusProvider {
fn event_bus(&self) -> &EventBus<BspConfirmStoppedStoring> {
&self.bsp_stop_storing_event_bus
Expand Down
41 changes: 38 additions & 3 deletions client/blockchain-service/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ use crate::{
events::{
AcceptedBspVolunteer, BlockchainServiceEventBusProvider, BspConfirmStoppedStoring,
FinalisedBspConfirmStoppedStoring, FinalisedMspStoppedStoringBucket,
FinalisedTrieRemoveMutationsApplied, LastChargeableInfoUpdated, NewStorageRequest,
SlashableProvider, SpStopStoringInsolventUser, UserWithoutFunds,
FinalisedTrieRemoveMutationsApplied, LastChargeableInfoUpdated, MoveBucketAccepted,
MoveBucketExpired, MoveBucketRejected, MoveBucketRequested, MoveBucketRequestedForNewMsp,
NewStorageRequest, SlashableProvider, SpStopStoringInsolventUser, UserWithoutFunds,
},
state::{
BlockchainServiceStateStore, LastProcessedBlockNumberCf,
Expand Down Expand Up @@ -504,7 +505,8 @@ impl Actor for BlockchainService {
.unwrap_or_else(|_| {
error!(target: LOG_TARGET, "Failed to query provider multiaddresses");
Err(QueryProviderMultiaddressesError::InternalError)
});
})
.map(convert_raw_multiaddresses_to_multiaddr);

match callback.send(multiaddresses) {
Ok(_) => {
Expand Down Expand Up @@ -1248,6 +1250,39 @@ impl BlockchainService {
size,
})
}
RuntimeEvent::FileSystem(
pallet_file_system::Event::MoveBucketRequested {
who: _,
bucket_id,
new_msp_id,
},
) => {
self.emit(MoveBucketRequested {
bucket_id,
new_msp_id,
});
if self.provider_ids.contains(&new_msp_id) {
self.emit(MoveBucketRequestedForNewMsp { bucket_id });
}
}
RuntimeEvent::FileSystem(
pallet_file_system::Event::MoveBucketRejected { bucket_id, msp_id },
) => {
self.emit(MoveBucketRejected { bucket_id, msp_id });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity, what happened to events that were not listened by any task? For example an MSP running a node, would emit this event internally, but no one would be listening to it. Did it get cleared somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no subscriber - nothing really, they get dropped.

}
RuntimeEvent::FileSystem(
pallet_file_system::Event::MoveBucketAccepted { bucket_id, msp_id },
) => {
self.emit(MoveBucketAccepted { bucket_id, msp_id });
}
RuntimeEvent::FileSystem(
pallet_file_system::Event::MoveBucketRequestExpired {
bucket_id,
msp_id,
},
) => {
self.emit(MoveBucketExpired { bucket_id, msp_id });
}
Comment on lines +1268 to +1285
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that these are events intended a BSP node, is that right? In that case, when my PR is merged with the changes to self.provider_ids, making it an enum, we can make that more explicit. Meaning that these events would only be emitted if the node is not managing an MSP nor BSP.

RuntimeEvent::FileSystem(
pallet_file_system::Event::BspConfirmStoppedStoring {
bsp_id,
Expand Down
32 changes: 18 additions & 14 deletions client/common/src/blockchain_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,25 @@ pub fn get_events_at_block(
pub fn convert_raw_multiaddresses_to_multiaddr(multiaddresses: Multiaddresses) -> Vec<Multiaddr> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn convert_raw_multiaddresses_to_multiaddr(multiaddresses: Multiaddresses) -> Vec<Multiaddr> {
pub fn convert_raw_multiaddresses_to_multiaddr_vec(multiaddresses: Multiaddresses) -> Vec<Multiaddr> {

let mut multiaddress_vec: Vec<Multiaddr> = Vec::new();
for raw_multiaddr in multiaddresses.into_iter() {
let multiaddress = match std::str::from_utf8(&raw_multiaddr) {
Ok(s) => match Multiaddr::from_str(s) {
Ok(multiaddr) => multiaddr,
Err(e) => {
error!("Failed to parse Multiaddress from string: {:?}", e);
continue;
}
},
if let Some(multiaddress) = convert_raw_multiaddress_to_multiaddr(&raw_multiaddr) {
multiaddress_vec.push(multiaddress);
}
}
multiaddress_vec
}

pub fn convert_raw_multiaddress_to_multiaddr(raw_multiaddr: &[u8]) -> Option<Multiaddr> {
Comment on lines 57 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about making these two return a Result instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the behaviour be, when would there be an error? Right now convert_raw_multiaddresses_to_multiaddr returns back whatever is valid. I agree convert_raw_multiaddress_to_multiaddr could be changed from Option to Result but not the other (which instead could be refactored out somehow).

match std::str::from_utf8(raw_multiaddr) {
Ok(s) => match Multiaddr::from_str(s) {
Ok(multiaddr) => Some(multiaddr),
Err(e) => {
error!("Failed to parse Multiaddress from bytes: {:?}", e);
continue;
error!("Failed to parse Multiaddress from string: {:?}", e);
None
}
};

multiaddress_vec.push(multiaddress);
},
Err(e) => {
error!("Failed to parse Multiaddress from bytes: {:?}", e);
None
}
}
multiaddress_vec
}
3 changes: 3 additions & 0 deletions client/common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub type StorageProviderId = pallet_storage_providers::types::StorageProviderId<
pub type MainStorageProviderId = pallet_storage_providers::types::ProviderIdFor<Runtime>;
pub type ProviderId = pallet_proofs_dealer::types::ProviderIdFor<Runtime>;
pub type Multiaddresses = pallet_storage_providers::types::Multiaddresses<Runtime>;
pub type MultiAddress = pallet_storage_providers::types::MultiAddress<Runtime>;
pub type RandomnessOutput = pallet_proofs_dealer::types::RandomnessOutputFor<Runtime>;
pub type ForestLeaf = pallet_proofs_dealer::types::KeyFor<Runtime>;
pub type ForestRoot = pallet_proofs_dealer::types::ForestRootFor<Runtime>;
Expand All @@ -62,6 +63,8 @@ pub type KeyProofs = BTreeMap<ForestLeaf, KeyProof>;
pub type Balance = pallet_storage_providers::types::BalanceOf<Runtime>;
pub type OpaqueBlock = storage_hub_runtime::opaque::Block;
pub type BlockHash = <OpaqueBlock as BlockT>::Hash;
pub type PeerId = pallet_file_system::types::PeerId<Runtime>;

/// Type alias for the events vector.
///
/// The events vector is a storage element in the FRAME system pallet, which stores all the events
Expand Down
1 change: 1 addition & 0 deletions client/file-transfer-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async-trait = { workspace = true }
array-bytes = { workspace = true }
futures = { workspace = true }
codec = { workspace = true }
chrono = { workspace = true }

# Substrate
sc-client-api = { workspace = true }
Expand Down
Loading
Loading