Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full domain chain snap sync. #3115

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
64e54ef
Add peer consensus for the last confirmed domain block receipt.
shamil-gadelshin Sep 30, 2024
40758d0
Introduce snap sync orchestrator.
shamil-gadelshin Oct 9, 2024
b7801f6
Introduce consensus sync params struct.
shamil-gadelshin Sep 16, 2024
ce5f5f4
Add domain snap sync algorithm.
shamil-gadelshin Sep 13, 2024
b2b85f8
Update MMR-sync
shamil-gadelshin Oct 9, 2024
ea16060
Update consensus snap-sync
shamil-gadelshin Oct 9, 2024
b9832af
Modify domain networking stack.
shamil-gadelshin Oct 10, 2024
3969d74
Update domain service (domain snap-sync integration)
shamil-gadelshin Oct 9, 2024
4a1404c
Update domain operator (domain snap-sync integration)
shamil-gadelshin Oct 10, 2024
f9f87a6
Update relayer (domain snap-sync integration)
shamil-gadelshin Oct 9, 2024
0568ab9
Update subspace-service (domain snap-sync integration)
shamil-gadelshin Oct 9, 2024
a274f32
Update subspace-node (domain snap-sync integration)
shamil-gadelshin Oct 9, 2024
bdb738a
Update test dependencies (domain snap-sync integration)
shamil-gadelshin Oct 9, 2024
859c2cd
Update Cargo.lock
shamil-gadelshin Oct 9, 2024
bf89a41
Remove unnecessary dependencies.
shamil-gadelshin Oct 11, 2024
eb78e45
Refactor domain snap sync
shamil-gadelshin Oct 14, 2024
b8571cf
Move wait_for_block_import method.
shamil-gadelshin Oct 16, 2024
c5d4b0f
Remove unnecessary AuxStore field from ConsensusChainSyncParams
shamil-gadelshin Oct 16, 2024
9956983
Change waiting for blocks to block notification acknowledgement
shamil-gadelshin Oct 17, 2024
44499af
Modify SubspaceSyncOracle to add domain snap sync.
shamil-gadelshin Oct 18, 2024
60d45e9
Refactor SnapSyncOrchestrator
shamil-gadelshin Oct 22, 2024
8a035c0
Move domain block receipt protocol to domain network
shamil-gadelshin Oct 18, 2024
bf34b67
Update last confirmed domain block execution receipt protocol.
shamil-gadelshin Oct 24, 2024
ad6a1f1
Mark block import streams as Unpin
teor2345 Oct 28, 2024
6962b8f
Merge pull request #3179 from autonomys/full-domain-snap-sync-unpin
shamil-gadelshin Oct 28, 2024
74c4f7f
Refactor domain snap sync.
shamil-gadelshin Oct 28, 2024
1f78a74
Simplify error handling.
shamil-gadelshin Oct 28, 2024
0bed801
Simplify async start_worker_task.
shamil-gadelshin Oct 29, 2024
7179802
Refactor domain snap sync
shamil-gadelshin Oct 30, 2024
b6e9f84
Add log message on. failed domain snap sync.
shamil-gadelshin Oct 30, 2024
2267205
Refactor error messages.
shamil-gadelshin Oct 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions crates/sc-consensus-subspace/src/slot_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ where
{
force_authoring: bool,
pause_sync: Arc<AtomicBool>,
domain_snap_sync_finished: Option<Arc<AtomicBool>>,
inner: SO,
}

Expand All @@ -106,6 +107,11 @@ where
// (default state), it also accounts for DSN sync
(!self.force_authoring && self.inner.is_major_syncing())
|| self.pause_sync.load(Ordering::Acquire)
|| self
.domain_snap_sync_finished
.as_ref()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this is an unusually strong ordering, which might have a performance impact. Usually it is enough to use Acquire for loads and Release for stores.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll set it to the proposed values. However, I don't think performance suffers here because of the access pattern (write once in total, query once per second). Please, correct me if I'm wrong.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides the domain worker, SubspaceSyncOracle is also used in other places of the consensus chain like the archiver, PoT worker and RPC, etc, and if the domain sync is not finished it will change the return value of is_major_syncing and some behaviors of these workers, better to have more eyes on this to see if it is okay.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SeqCst ordering has an impact on read and write performance, because it imposes a total order across all threads:
https://doc.rust-lang.org/std/sync/atomic/enum.Ordering.html#variant.SeqCst

.map(|sync_finished| !sync_finished.load(Ordering::Acquire))
.unwrap_or_default()
}

fn is_offline(&self) -> bool {
Expand All @@ -122,10 +128,12 @@ where
force_authoring: bool,
pause_sync: Arc<AtomicBool>,
substrate_sync_oracle: SO,
domain_snap_sync_finished: Option<Arc<AtomicBool>>,
) -> Self {
Self {
force_authoring,
pause_sync,
domain_snap_sync_finished,
inner: substrate_sync_oracle,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,14 @@ fn main() -> Result<(), Error> {

let keystore = partial_components.keystore_container.keystore();

let consensus_chain_node = subspace_service::new_full::<PosTable, _>(
let consensus_chain_node =
subspace_service::new_full::<PosTable, _>(
consensus_chain_config,
partial_components,
None,
true,
SlotProportion::new(3f32 / 4f32),
None,
)
.await
.map_err(|error| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use sc_cli::CliConfiguration;
use sc_consensus_subspace::block_import::BlockImportingNotification;
use sc_consensus_subspace::notification::SubspaceNotificationStream;
use sc_consensus_subspace::slot_worker::NewSlotNotification;
use sc_network::NetworkPeers;
use sc_network::{NetworkPeers, NetworkRequest};
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
use sp_api::ProvideRuntimeApi;
Expand All @@ -28,6 +28,7 @@ use std::sync::Arc;
use subspace_runtime::RuntimeApi as CRuntimeApi;
use subspace_runtime_primitives::opaque::Block as CBlock;
use subspace_runtime_primitives::AccountId;
use subspace_service::domains::ConsensusChainSyncParams;
use subspace_service::FullClient as CFullClient;

/// `DomainInstanceStarter` used to start a domain instance node based on the given
Expand Down Expand Up @@ -101,7 +102,8 @@ impl DomainInstanceStarter {
block_importing_notification.block_number,
block_importing_notification.acknowledgement_sender,
)
});
})
.boxed();

let new_slot_notification_stream = || {
new_slot_notification_stream
Expand Down Expand Up @@ -161,6 +163,9 @@ impl DomainInstanceStarter {
// Always set it to `None` to not running the normal bundle producer
maybe_operator_id: None,
confirmation_depth_k: chain_constants.confirmation_depth_k(),
consensus_chain_sync_params: None::<
ConsensusChainSyncParams<_, Arc<dyn NetworkRequest + Sync + Send>>,
>,
};

let mut domain_node = domain_service::new_full::<
Expand All @@ -173,6 +178,7 @@ impl DomainInstanceStarter {
evm_domain_runtime::RuntimeApi,
AccountId20,
_,
_,
>(domain_params)
.await?;

Expand Down Expand Up @@ -219,6 +225,9 @@ impl DomainInstanceStarter {
// Always set it to `None` to not running the normal bundle producer
maybe_operator_id: None,
confirmation_depth_k: chain_constants.confirmation_depth_k(),
consensus_chain_sync_params: None::<
ConsensusChainSyncParams<_, Arc<dyn NetworkRequest + Sync + Send>>,
>,
};

let mut domain_node = domain_service::new_full::<
Expand All @@ -231,6 +240,7 @@ impl DomainInstanceStarter {
auto_id_domain_runtime::RuntimeApi,
AccountId32,
_,
_,
>(domain_params)
.await?;

Expand Down
73 changes: 44 additions & 29 deletions crates/subspace-node/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ use domain_runtime_primitives::opaque::Block as DomainBlock;
use futures::FutureExt;
use sc_cli::Signals;
use sc_consensus_slots::SlotProportion;
use sc_service::{BlocksPruning, PruningMode};
use sc_storage_monitor::StorageMonitorService;
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sc_utils::mpsc::tracing_unbounded;
use sp_core::traits::SpawnEssentialNamed;
use sp_messenger::messages::ChainId;
use std::env;
use std::sync::Arc;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_runtime::{Block, RuntimeApi};
use subspace_service::config::ChainSyncMode;
use subspace_service::domains::snap_sync_orchestrator::SnapSyncOrchestrator;
use subspace_service::domains::ConsensusChainSyncParams;
use tracing::{debug, error, info, info_span, warn};

/// Options for running a node
Expand Down Expand Up @@ -118,29 +120,22 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
info!("🏷 Node name: {}", subspace_configuration.network.node_name);
info!("💾 Node path: {}", base_path.display());

if maybe_domain_configuration.is_some() && subspace_configuration.sync == ChainSyncMode::Snap {
return Err(Error::Other(
"Snap sync mode is not supported for domains, use full sync".to_string(),
));
}

if maybe_domain_configuration.is_some()
&& (matches!(
subspace_configuration.blocks_pruning,
BlocksPruning::Some(_)
) || matches!(
subspace_configuration.state_pruning,
Some(PruningMode::Constrained(_))
))
let fork_id = subspace_configuration
.base
.chain_spec
.fork_id()
.map(String::from);
let snap_sync_orchestrator = if maybe_domain_configuration.is_some()
&& subspace_configuration.sync == ChainSyncMode::Snap
{
return Err(Error::Other(
"Running an operator requires both `--blocks-pruning` and `--state-pruning` to be set \
to either `archive` or `archive-canonical`"
.to_string(),
));
}
Some(Arc::new(SnapSyncOrchestrator::new()))
} else {
None
};

let mut task_manager = {
let mut segment_headers_store = None;
let subspace_link;
let consensus_chain_node = {
let span = info_span!("Consensus");
let _enter = span.enter();
Expand All @@ -159,6 +154,10 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
))
})?;

subspace_link = partial_components.other.subspace_link.clone();

segment_headers_store.replace(partial_components.other.segment_headers_store.clone());

let full_node_fut = subspace_service::new_full::<PosTable, _>(
subspace_configuration,
partial_components,
Expand All @@ -169,6 +168,7 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
}),
true,
SlotProportion::new(3f32 / 4f32),
snap_sync_orchestrator.clone(),
);

full_node_fut.await.map_err(|error| {
Expand Down Expand Up @@ -278,25 +278,26 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
};

let domain_start_options = DomainStartOptions {
consensus_client: consensus_chain_node.client,
consensus_client: consensus_chain_node.client.clone(),
consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory::new(
consensus_chain_node.transaction_pool,
),
consensus_network: consensus_chain_node.network_service,
consensus_network: consensus_chain_node.network_service.clone(),
block_importing_notification_stream: consensus_chain_node
.block_importing_notification_stream,
pot_slot_info_stream: consensus_chain_node.pot_slot_info_stream,
consensus_network_sync_oracle: consensus_chain_node.sync_service,
consensus_network_sync_oracle: consensus_chain_node.sync_service.clone(),
domain_message_receiver,
gossip_message_sink,
};

consensus_chain_node
.task_manager
.spawn_essential_handle()
.spawn_essential_blocking(
"domain",
Some("domains"),
.spawn_essential_blocking("domain", Some("domains"), {
let consensus_chain_network_service =
consensus_chain_node.network_service.clone();
let consensus_chain_sync_service = consensus_chain_node.sync_service.clone();
Box::pin(async move {
let span = info_span!("Domain");
let _enter = span.enter();
Expand All @@ -305,6 +306,7 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
&*domain_start_options.consensus_client,
domain_configuration.domain_id,
);

let bootstrap_result = match bootstrap_result_fut.await {
Ok(bootstrap_result) => bootstrap_result,
Err(error) => {
Expand All @@ -313,17 +315,30 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
}
};

let consensus_chain_sync_params =
snap_sync_orchestrator.map(|snap_sync_orchestrator| {
ConsensusChainSyncParams {
snap_sync_orchestrator,
fork_id,
network_service: consensus_chain_network_service,
sync_service: consensus_chain_sync_service,
backend: consensus_chain_node.backend.clone(),
subspace_link,
}
});

let start_domain = run_domain(
bootstrap_result,
domain_configuration,
domain_start_options,
consensus_chain_sync_params,
);

if let Err(error) = start_domain.await {
error!(%error, "Domain starter exited with an error");
}
}),
);
})
});
};

consensus_chain_node.network_starter.start_network();
Expand Down
Loading