Skip to content

Commit

Permalink
Merge commit 'bb84f7cf1d629fc2b47bd8a4c1df31bf98563136' into macrod-db
Browse files Browse the repository at this point in the history
  • Loading branch information
econsta committed Oct 13, 2023
2 parents 811fa5c + bb84f7c commit 8ce3a9d
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 132 deletions.
70 changes: 23 additions & 47 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 18 additions & 11 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct ActiveTributary<D: Db, P: P2p> {
pub tributary: Arc<Tributary<D, Transaction, P>>,
}

// Adds a tributary into the specified HashMap
// Creates a new tributary and sends it to all listeners.
async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
Expand Down Expand Up @@ -761,6 +761,8 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
);
// If we failed to publish it, restore it
batches.push_front(batch);
// Sleep for a few seconds before retrying to prevent hammering the node
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

Expand Down Expand Up @@ -985,7 +987,10 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
mut new_tributary: broadcast::Receiver<ActiveTributary<D, P>>,
) {
let mut channels = HashMap::new();
for network in [NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero] {
for network in serai_client::primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
}
let (send, recv) = mpsc::unbounded_channel();
tokio::spawn(handle_processor_messages(
db.clone(),
Expand Down Expand Up @@ -1048,15 +1053,17 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
async move {
loop {
let spec = new_tributary_spec_recv.recv().await.unwrap();
add_tributary(
raw_db.clone(),
key.clone(),
&processors,
p2p.clone(),
&new_tributary,
spec.clone(),
)
.await;
// Uses an inner task as Tributary::new may take several seconds
tokio::spawn({
let raw_db = raw_db.clone();
let key = key.clone();
let processors = processors.clone();
let p2p = p2p.clone();
let new_tributary = new_tributary.clone();
async move {
add_tributary(raw_db, key, &processors, p2p, &new_tributary, spec).await;
}
});
}
}
});
Expand Down
25 changes: 24 additions & 1 deletion coordinator/tributary/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ pub trait ReadWrite: Sized {

#[async_trait]
pub trait P2p: 'static + Send + Sync + Clone + Debug {
/// Broadcast a message to all other members of the Tributary with the specified genesis.
///
/// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't
/// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based
/// deduplication to ensure a sane amount of load.
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>);
}

Expand Down Expand Up @@ -178,7 +183,25 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
);
let blockchain = Arc::new(RwLock::new(blockchain));

let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p };
let to_rebroadcast = Arc::new(RwLock::new(vec![]));
// Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the
// P2P layer
tokio::spawn({
let to_rebroadcast = to_rebroadcast.clone();
let p2p = p2p.clone();
async move {
loop {
let to_rebroadcast = to_rebroadcast.read().await.clone();
for msg in to_rebroadcast {
p2p.broadcast(genesis, msg).await;
}
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
}
}
});

let network =
TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p };

let TendermintHandle { synced_block, synced_block_result, messages, machine } =
TendermintMachine::new(network.clone(), block_number, start_time, proposal).await;
Expand Down
16 changes: 16 additions & 0 deletions coordinator/tributary/src/tendermint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ pub struct TendermintNetwork<D: Db, T: TransactionTrait, P: P2p> {
pub(crate) validators: Arc<Validators>,
pub(crate) blockchain: Arc<RwLock<Blockchain<D, T>>>,

pub(crate) to_rebroadcast: Arc<RwLock<Vec<Vec<u8>>>>,

pub(crate) p2p: P,
}

Expand Down Expand Up @@ -304,8 +306,19 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
}

async fn broadcast(&mut self, msg: SignedMessageFor<Self>) {
// Since we're broadcasting a Tendermint message, set it to be re-broadcasted every second
// until the block it's trying to build is complete
// If the P2P layer drops a message before all nodes obtained access, or a node had an
// intermittent failure, this will ensure reconcilliation
// Resolves halts caused by timing discrepancies, which technically are violations of
// Tendermint as a BFT protocol, and shouldn't occur yet have in low-powered testing
// environments
// This is atrocious if there's no content-based deduplication protocol for messages actively
// being gossiped
// LibP2p, as used by Serai, is configured to content-based deduplicate
let mut to_broadcast = vec![TENDERMINT_MESSAGE];
to_broadcast.extend(msg.encode());
self.to_rebroadcast.write().await.push(to_broadcast.clone());
self.p2p.broadcast(self.genesis, to_broadcast).await
}

Expand Down Expand Up @@ -407,6 +420,9 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
}
}

// Since we've added a valid block, clear to_rebroadcast
*self.to_rebroadcast.write().await = vec![];

Some(TendermintBlock(
self.blockchain.write().await.build_block::<Self>(self.signature_scheme()).serialize(),
))
Expand Down
13 changes: 8 additions & 5 deletions message-queue/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,14 @@ async fn main() {
Some(<Ristretto as Ciphersuite>::G::from_bytes(&repr).unwrap())
};

const ALL_EXT_NETWORKS: [NetworkId; 3] =
[NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero];

let register_service = |service, key| {
(*KEYS).write().unwrap().insert(service, key);
let mut queues = (*QUEUES).write().unwrap();
if service == Service::Coordinator {
for network in ALL_EXT_NETWORKS {
for network in serai_primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
}
queues.insert(
(service, Service::Processor(network)),
RwLock::new(Queue(db.clone(), service, Service::Processor(network))),
Expand All @@ -205,7 +205,10 @@ async fn main() {
};

// Make queues for each NetworkId, other than Serai
for network in ALL_EXT_NETWORKS {
for network in serai_primitives::NETWORKS {
if network == NetworkId::Serai {
continue;
}
// Use a match so we error if the list of NetworkIds changes
let Some(key) = read_key(match network {
NetworkId::Serai => unreachable!(),
Expand Down
3 changes: 1 addition & 2 deletions processor/src/networks/monero.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,7 @@ impl Network for Monero {
&spendable_outputs,
)
.await
.map_err(|_| NetworkError::ConnectionError)
.unwrap();
.map_err(|_| NetworkError::ConnectionError)?;

let inputs = spendable_outputs.into_iter().zip(decoys).collect::<Vec<_>>();

Expand Down
4 changes: 2 additions & 2 deletions substrate/client/tests/validator_sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use rand_core::{RngCore, OsRng};
use sp_core::{sr25519::Public, Pair};

use serai_client::{
primitives::{NetworkId, insecure_pair_from_name},
primitives::{NETWORKS, NetworkId, insecure_pair_from_name},
validator_sets::{
primitives::{Session, ValidatorSet, musig_key},
ValidatorSetsEvent,
Expand Down Expand Up @@ -38,7 +38,7 @@ serai_test!(
.get_new_set_events(serai.get_block_by_number(0).await.unwrap().unwrap().hash())
.await
.unwrap(),
[NetworkId::Serai, NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero]
NETWORKS
.iter()
.copied()
.map(|network| ValidatorSetsEvent::NewSet {
Expand Down
7 changes: 5 additions & 2 deletions substrate/in-instructions/pallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ pub mod pallet {

let batch = batch.batch;

// TODO: Test validate_unsigned is actually called prior to execution, which is required for
// this to be safe
LastBatchBlock::<T>::insert(batch.network, frame_system::Pallet::<T>::block_number());

LastBatch::<T>::insert(batch.network, batch.id);
Expand Down Expand Up @@ -204,6 +202,11 @@ pub mod pallet {
.propagate(true)
.build()
}

// Explicitly provide a pre-dispatch which calls validate_unsigned
fn pre_dispatch(call: &Self::Call) -> Result<(), TransactionValidityError> {
Self::validate_unsigned(TransactionSource::InBlock, call).map(|_| ()).map_err(Into::into)
}
}
}

Expand Down
12 changes: 9 additions & 3 deletions substrate/node/src/chain_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@ fn testnet_genesis(
},

validator_sets: ValidatorSetsConfig {
stake: Amount(1_000_000 * 10_u64.pow(8)),
// TODO: Array of these in primitives
networks: vec![NetworkId::Serai, NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero],
networks: serai_runtime::primitives::NETWORKS
.iter()
.map(|network| match network {
NetworkId::Serai => (NetworkId::Serai, Amount(50_000 * 10_u64.pow(8))),
NetworkId::Bitcoin => (NetworkId::Bitcoin, Amount(1_000_000 * 10_u64.pow(8))),
NetworkId::Ethereum => (NetworkId::Ethereum, Amount(1_000_000 * 10_u64.pow(8))),
NetworkId::Monero => (NetworkId::Monero, Amount(100_000 * 10_u64.pow(8))),
})
.collect(),
participants: validators.iter().map(|name| account_from_name(name)).collect(),
},
session: SessionConfig { keys: validators.iter().map(|name| session_key(*name)).collect() },
Expand Down
3 changes: 3 additions & 0 deletions substrate/primitives/src/networks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub enum NetworkId {
Monero,
}

pub const NETWORKS: [NetworkId; 4] =
[NetworkId::Serai, NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero];

/// The type used to identify coins.
#[derive(
Clone,
Expand Down
2 changes: 1 addition & 1 deletion substrate/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ parameter_types! {
NORMAL_DISPATCH_RATIO,
);

pub const MaxAuthorities: u32 = validator_sets::primitives::MAX_VALIDATORS_PER_SET;
pub const MaxAuthorities: u32 = validator_sets::primitives::MAX_KEY_SHARES_PER_SET;
}

pub struct CallFilter;
Expand Down
Loading

0 comments on commit 8ce3a9d

Please sign in to comment.