Skip to content
This repository has been archived by the owner on Jul 27, 2022. It is now read-only.

Commit

Permalink
Problem: Missing runner for TDBE + No persistence of TDBE fetched tra…
Browse files Browse the repository at this point in the history
…nsactions

Solution: Added TDBE runner and persistence stream for transactions fetched by TDBE. Fixes #1995.

Note: There are still few things missing in the TDBE runner:

- Fetching remote TDBE details from RPC
- Fetching transaction IDs using light client
- Attested TVE <-> TDBE stream
  • Loading branch information
devashishdxt committed Aug 25, 2020
1 parent 0e0175e commit 4b98470
Show file tree
Hide file tree
Showing 12 changed files with 353 additions and 76 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion chain-abci/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ chain-core = { path = "../chain-core" }
chain-storage = { path = "../chain-storage" }
chain-tx-filter = { path = "../chain-tx-filter" }
chain-tx-validation = { path = "../chain-tx-validation" }
enclave-protocol = { path = "../enclave-protocol" }
enclave-protocol = { path = "../enclave-protocol", features = ["edp"] }
mock-utils = { path = "../chain-tx-enclave/mock-utils" }
mls = { path = "../chain-tx-enclave-next/mls" }
ra-client = { path = "../chain-tx-enclave-next/enclave-ra/ra-client" }
Expand All @@ -39,13 +39,15 @@ structopt = "0.3"
secp256k1 = { git = "https://github.com/crypto-com/rust-secp256k1-zkp.git", rev = "1aae6edc5f1de0bbdcdb26f1f1d8b00ca28e012a", features = ["recovery", "endomorphism", "global-context"] }
parity-scale-codec = { features = ["derive"], version = "1.3" }
thiserror = "1.0"
kvdb = "0.7"

[target.'cfg(target_os = "linux")'.dependencies]
aesm-client = {version = "0.5", features = ["sgxs"], optional = true }
enclave-runner = {version = "0.4", optional = true}
sgxs-loaders = {version = "0.2", optional = true}
tokio = { version = "0.2", features = ["uds"], optional = true }
rand = "0.7"
tdbe-common = { path = "../chain-tx-enclave-next/tdbe/tdbe-common" }

[build-dependencies]
cc = "1.0"
Expand Down
3 changes: 2 additions & 1 deletion chain-abci/src/enclave_bridge/edp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod server;
pub mod tdbe;

use crate::enclave_bridge::EnclaveProxy;
use aesm_client::AesmClient;
Expand All @@ -13,7 +14,7 @@ use parity_scale_codec::{Decode, Encode};
use sgxs_loaders::isgx::Device;
use std::io::{Read, Write};
use std::sync::{mpsc::channel, Arc, Mutex};
use std::thread::{self};
use std::thread;
use std::{future::Future, io, pin::Pin};

use ra_sp_server::{config::SpRaConfig, server::SpRaServer};
Expand Down
223 changes: 223 additions & 0 deletions chain-abci/src/enclave_bridge/edp/tdbe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
use std::{
collections::HashMap,
future::Future,
io::{self, Cursor, Seek, SeekFrom},
os::unix::net::UnixStream,
pin::Pin,
sync::Arc,
thread,
};

use aesm_client::AesmClient;
use enclave_runner::{
usercalls::{AsyncListener, AsyncStream, UsercallExtension},
EnclaveBuilder,
};
use kvdb::KeyValueDB;
use sgxs_loaders::isgx::Device;
use tdbe_common::TdbeStartupConfig;
use tokio::net::{TcpListener, TcpStream};

use chain_core::tx::data::TxId;
use chain_storage::ReadOnlyStorage;
use enclave_protocol::{
codec::{StreamRead, StreamWrite},
tdbe_protocol::PersistenceCommand,
EnclaveRequest, EnclaveResponse, SealedLog,
};
use ra_sp_server::config::SpRaConfig;

use crate::enclave_bridge::TdbeConfig;

#[derive(Debug)]
pub struct TdbeApp {
/// UDS to connect to `chain-abci`
chain_abci_stream: UnixStream,
/// UDS to persist data to `chain-storage`
persistence_stream: UnixStream,
/// `ra-sp-server` address for remote attestation. E.g. `0.0.0.0:8989`
/// TODO: Replace it with a local UDS (using `chain-abci` as launcher).
sp_address: String,
/// Optional address of remote TDBE server for fetching initial data
remote_rpc_address: Option<String>,
/// Local TDBE server address to listen on. E.g. `127.0.0.1:3445`
local_listen_address: String,
}

impl TdbeApp {
/// Creates a new instance of TDBE app
pub fn new(
tdbe_config: &TdbeConfig,
ra_config: &SpRaConfig,
storage: Arc<dyn KeyValueDB>,
) -> std::io::Result<Self> {
// - `chain_abci_stream` is passed to enclave. Encalve can send requests to chain-abci
// using this
// - `chain_abci_receiver` listens to the requests sent by enclave and responds to them
let (chain_abci_stream, chain_abci_receiver) = UnixStream::pair()?;

// - `persistence_stream` is passed to enclave. Encalve can send requests to chain-storage
// using this
// - `persistence_receiver` listens to the requests sent by enclave and responds to them
let (persistence_stream, persistence_receiver) = UnixStream::pair()?;

spawn_chain_abci_thread(chain_abci_receiver, storage.clone());
spawn_persistence_thread(persistence_receiver, storage);

Ok(Self {
chain_abci_stream,
persistence_stream,
sp_address: ra_config.address.clone(),
remote_rpc_address: tdbe_config.remote_rpc_address.clone(),
local_listen_address: tdbe_config.local_listen_address.clone(),
})
}

pub fn spawn(self) {
thread::spawn(move || {
let mut device = Device::new()
.expect("SGX device was not found")
.einittoken_provider(AesmClient::new())
.build();
let mut enclave_builder = EnclaveBuilder::new("tdb-enclave-app.sgxs".as_ref());

enclave_builder
.coresident_signature()
.expect("Enclave signature file not found");
enclave_builder.usercall_extension(self);

let enclave = enclave_builder
.build(&mut device)
.expect("Failed to build enclave");
enclave.run().expect("Failed to start enclave")
});
}
}

fn spawn_chain_abci_thread(mut receiver: UnixStream, storage: Arc<dyn KeyValueDB>) {
let _ = thread::spawn(move || {
let storage = chain_storage::ReadOnlyStorage::new_db(storage);

while let Ok(enclave_request) = EnclaveRequest::read_from(&mut receiver) {
match enclave_request {
EnclaveRequest::GetSealedTxData { txids } => {
let response =
EnclaveResponse::GetSealedTxData(get_sealed_tx_data(txids, &storage));
response
.write_to(&mut receiver)
.expect("Unable to write to TDBE <-> chain-abci unix stream");
}
_ => unreachable!("TDBE can only send `GetSealedTxData` request"),
}
}
});
}

fn get_sealed_tx_data(txids: Vec<TxId>, storage: &ReadOnlyStorage) -> Option<Vec<SealedLog>> {
let mut result = Vec::with_capacity(txids.len());

for txid in txids {
if let Some(txin) = storage.get_sealed_log(&txid) {
result.push(txin);
} else {
return None;
}
}

Some(result)
}

fn spawn_persistence_thread(mut receiver: UnixStream, storage: Arc<dyn KeyValueDB>) {
let _ = thread::spawn(move || {
let mut storage = chain_storage::Storage::new_db(storage);
let mut buffer = HashMap::new();
let mut kvdb = chain_storage::buffer::BufferStore::new(&storage, &mut buffer);

while let Ok(persistence_command) = PersistenceCommand::read_from(&mut receiver) {
match persistence_command {
PersistenceCommand::Store {
transaction_id,
sealed_log,
} => chain_storage::store_sealed_log(&mut kvdb, &transaction_id, &sealed_log),
PersistenceCommand::Finish { last_fetched_block } => {
chain_storage::set_last_fetched_block(&mut kvdb, last_fetched_block);
break;
}
}
}

chain_storage::buffer::flush_storage(&mut storage, std::mem::take(&mut buffer))
.expect("Unable to flush storage");
});
}

#[allow(clippy::type_complexity)]
impl UsercallExtension for TdbeApp {
fn connect_stream<'future>(
&'future self,
addr: &'future str,
_local_addr: Option<&'future mut String>,
_peer_addr: Option<&'future mut String>,
) -> Pin<Box<dyn Future<Output = io::Result<Option<Box<dyn AsyncStream>>>> + 'future>> {
async fn connect_stream_inner(
this: &TdbeApp,
addr: &str,
) -> io::Result<Option<Box<dyn AsyncStream>>> {
match addr {
// Passes initial startup configuration to enclave
"init" => {
let tdbe_startup_config = TdbeStartupConfig {
remote_rpc_address: this.remote_rpc_address.clone(),
};

let mut stream = Cursor::new(Vec::new());
tdbe_startup_config
.write_to(&mut stream)
.expect("Unable to write initial configuration to `Cursor`");

stream
.seek(SeekFrom::Start(0))
.expect("Unable to seek to starting position on a Cursor");

Ok(Some(Box::new(stream)))
}
// Connects enclave to chain-abci
"chain-abci" => {
let stream =
tokio::net::UnixStream::from_std(this.chain_abci_stream.try_clone()?)?;
Ok(Some(Box::new(stream)))
}
// Connects enclave to ra-sp-server
"ra-sp-server" => {
let stream = TcpStream::connect(&this.sp_address).await?;
Ok(Some(Box::new(stream)))
}
_ => Ok(None),
}
}

Box::pin(connect_stream_inner(self, addr))
}

fn bind_stream<'future>(
&'future self,
addr: &'future str,
_local_addr: Option<&'future mut String>,
) -> Pin<Box<dyn Future<Output = io::Result<Option<Box<dyn AsyncListener>>>> + 'future>> {
async fn bind_stream_inner(
this: &TdbeApp,
addr: &str,
) -> io::Result<Option<Box<dyn AsyncListener>>> {
match addr {
// Binds TCP listener for TDBE server
"tdbe" => {
let listener = TcpListener::bind(&this.local_listen_address).await?;
Ok(Some(Box::new(listener)))
}
_ => Ok(None),
}
}

Box::pin(bind_stream_inner(self, addr))
}
}
13 changes: 13 additions & 0 deletions chain-abci/src/enclave_bridge/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use serde::{Deserialize, Serialize};

use enclave_protocol::{IntraEnclaveRequest, IntraEnclaveResponse};

/// TODO: feature-guard when workspaces can be built with --features flag: https://github.com/rust-lang/cargo/issues/5015
Expand All @@ -6,6 +8,17 @@ pub mod mock;
#[cfg(all(not(feature = "mock-enclave"), feature = "edp", target_os = "linux"))]
pub mod edp;

#[derive(Serialize, Deserialize, Debug, Default)]
pub struct TdbeConfig {
/// Optional TM RPC address of another TDBE server from where to fetch data
pub remote_rpc_address: Option<String>,
/// Local TDBE server address to listen on. E.g. `127.0.0.1:3445`
pub local_listen_address: String,
/// External TDBE server address, used by remote nodes to send RPC requests. E.g.
/// `<public_ip>:<public_port>`
pub external_listen_address: String,
}

/// Abstracts over communication with an external part that does enclave calls
pub trait EnclaveProxy: Sync + Send + Sized + Clone {
// sanity check for checking enclave initialization
Expand Down
16 changes: 1 addition & 15 deletions chain-abci/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use chain_abci::enclave_bridge::edp::{
};
#[cfg(any(feature = "mock-enclave", not(target_os = "linux")))]
use chain_abci::enclave_bridge::mock::MockClient;
use chain_abci::enclave_bridge::EnclaveProxy;
use chain_abci::enclave_bridge::{EnclaveProxy, TdbeConfig};
use chain_core::init::network::{get_network, get_network_id, init_chain_id};
use chain_storage::ReadOnlyStorage;
use chain_storage::{Storage, StorageConfig, StorageType};
Expand Down Expand Up @@ -37,20 +37,6 @@ pub struct Config {
data_bootstrap: TdbeConfig,
}

/// TODO: more concrete when ready
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct TdbeConfig {
/// Optional TM RPC address of another TDBE server from where to fetch data
/// TODO: it'll get txids + the TDBE connection details from RPC
/// TODO: DNS can be obtained from RPC?
pub remote_rpc_address: Option<String>,
/// Local TDBE server address to listen on. E.g. `127.0.0.1:3445`
pub local_listen_address: String,
/// External TDBE server address, used by remote nodes to send RPC requests. E.g.
/// `<public_ip>:<public_port>`
pub external_listen_address: String,
}

impl Default for Config {
fn default() -> Self {
Self {
Expand Down
10 changes: 9 additions & 1 deletion chain-storage/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use chain_core::tx::data::{
use super::buffer::{GetKV, StoreKV};
use super::{
LookupItem, StoredChainState, CHAIN_ID_KEY, COL_APP_HASHS, COL_APP_STATES, COL_EXTRA,
COL_NODE_INFO, COL_STAKING_VERSIONS, GENESIS_APP_HASH_KEY, LAST_STATE_KEY,
COL_NODE_INFO, COL_STAKING_VERSIONS, GENESIS_APP_HASH_KEY, LAST_FETCHED_BLOCK_KEY,
LAST_STATE_KEY,
};

pub fn get_last_app_state(db: &impl GetKV) -> Option<Vec<u8>> {
Expand All @@ -34,6 +35,13 @@ pub fn lookup_item(
db.get(&(col, txid_or_app_hash.to_vec()))
}

pub fn set_last_fetched_block(db: &mut impl StoreKV, last_fetched_block: u32) {
db.set(
(COL_EXTRA, LAST_FETCHED_BLOCK_KEY.to_vec()),
last_fetched_block.encode(),
)
}

pub fn insert_item(
db: &mut impl StoreKV,
item_type: LookupItem,
Expand Down
5 changes: 5 additions & 0 deletions chain-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub const NUM_COLUMNS: u32 = 12;
pub const CHAIN_ID_KEY: &[u8] = b"chain_id";
pub const GENESIS_APP_HASH_KEY: &[u8] = b"genesis_app_hash";
pub const LAST_STATE_KEY: &[u8] = b"last_state";
pub const LAST_FETCHED_BLOCK_KEY: &[u8] = b"last_fetched_block";

pub enum StorageType {
Node,
Expand Down Expand Up @@ -114,6 +115,10 @@ impl Get for ReadOnlyStorage {
}

impl ReadOnlyStorage {
pub fn new_db(db: Arc<dyn KeyValueDB>) -> Self {
Self { db }
}

pub fn get_last_app_state(&self) -> Option<Vec<u8>> {
self.db
.get(COL_NODE_INFO, LAST_STATE_KEY)
Expand Down
Loading

0 comments on commit 4b98470

Please sign in to comment.