Skip to content
This repository has been archived by the owner on Jul 27, 2022. It is now read-only.

Commit

Permalink
Try #2189:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] authored Aug 26, 2020
2 parents e2a8ebf + b9970ae commit ed9edb0
Show file tree
Hide file tree
Showing 21 changed files with 430 additions and 188 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 50 additions & 13 deletions chain-abci/src/enclave_bridge/edp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,55 @@ use tokio::net::{TcpListener, TcpStream};

use std::os::unix::net::UnixStream;

const REMOTE_ATTESTATION_PROXY: &str = "ra-sp-server";

/// pair of unix domain sockets
/// enclave_stream is only needed / passed in `connect_stream`
/// `runner_stream` is shared in chain-abci app
#[derive(Debug)]
pub struct TxValidationApp {
enclave_stream: Option<UnixStream>,
runner_stream: Arc<Mutex<UnixStream>>,
/// `ra-sp-server` address for remote attestation. E.g. `0.0.0.0:8989`
/// FIXME: enclave direct connection -- not via TCP proxy
sp_address: Option<String>,
tdbe_stream: Option<UnixStream>,
}

impl Clone for TxValidationApp {
/// clone is only used for `TxValidationServer`/`chain-abci` having access to "runner_stream",
/// not for launching
fn clone(&self) -> Self {
Self {
enclave_stream: None,
runner_stream: self.runner_stream.clone(),
sp_address: None,
tdbe_stream: None,
}
}
}

impl Default for TxValidationApp {
fn default() -> Self {
let (sender, receiver) = UnixStream::pair().expect("init tx validation socket");
Self {
enclave_stream: Some(receiver),
runner_stream: Arc::new(Mutex::new(sender)),
}
impl TxValidationApp {
fn new(sp_address: String) -> (Self, UnixStream) {
let (sender, receiver) = UnixStream::pair().expect("init chain-abci<->tve socket");
let (from_tdbe_to_tve, from_tve_to_tdbe) =
UnixStream::pair().expect("init tve<->tdbe socket");

(
Self {
enclave_stream: Some(receiver),
runner_stream: Arc::new(Mutex::new(sender)),
sp_address: Some(sp_address),
tdbe_stream: Some(from_tve_to_tdbe),
},
from_tdbe_to_tve,
)
}
}

type UserCallStream = io::Result<Option<Box<dyn AsyncStream>>>;
type UserCallListener = io::Result<Option<Box<dyn AsyncListener>>>;
/// type aliases for outputs in UsercallExtension async return types
pub type UserCallStream = io::Result<Option<Box<dyn AsyncStream>>>;
pub type UserCallListener = io::Result<Option<Box<dyn AsyncListener>>>;

impl UsercallExtension for TxValidationApp {
fn connect_stream<'future>(
Expand All @@ -70,6 +89,22 @@ impl UsercallExtension for TxValidationApp {
Ok(None)
}
}
REMOTE_ATTESTATION_PROXY => {
if let Some(ra_address) = this.sp_address.as_ref() {
let stream = TcpStream::connect(ra_address).await?;
Ok(Some(Box::new(stream)))
} else {
Ok(None)
}
}
"tdbe" => {
if let Some(enclave_stream) = this.tdbe_stream.as_ref() {
let stream = tokio::net::UnixStream::from_std(enclave_stream.try_clone()?)?;
Ok(Some(Box::new(stream)))
} else {
Ok(None)
}
}
_ => Ok(None),
}
}
Expand Down Expand Up @@ -113,8 +148,10 @@ impl EnclaveProxy for TxValidationApp {
/// Launches tx-validation enclave --
/// it expects "tx-validation-next.sgxs" (+ signature)
/// to be in the same directory as chain-abci
pub fn launch_tx_validation() -> TxValidationApp {
let app = TxValidationApp::default();
/// it returns the "copied" app (for `TxValidationServer` / chain-abci)
/// + the unix stream for transaction data bootstrapping enclave
pub fn launch_tx_validation(ra_proxy_address: String) -> (TxValidationApp, UnixStream) {
let (app, from_tdbe_to_tve) = TxValidationApp::new(ra_proxy_address);
let app2 = app.clone();
let mut device = Device::new()
.expect("SGX device was not found")
Expand All @@ -134,7 +171,7 @@ pub fn launch_tx_validation() -> TxValidationApp {
log::info!("starting tx validation enclave");
enclave.run().expect("Failed to start enclave")
});
app2
(app2, from_tdbe_to_tve)
}

/// Temporary tx query launching options
Expand Down Expand Up @@ -167,7 +204,7 @@ impl UsercallExtension for TempTxQueryOptions {
tokio::net::UnixStream::from_std(this.chain_abci_data.try_clone()?)?;
Ok(Some(Box::new(stream)))
}
"ra-sp-server" => {
REMOTE_ATTESTATION_PROXY => {
let stream = TcpStream::connect(&this.sp_address).await?;
Ok(Some(Box::new(stream)))
}
Expand Down
81 changes: 42 additions & 39 deletions chain-abci/src/enclave_bridge/edp/tdbe.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
use std::{
collections::HashMap,
future::Future,
io::{self, Cursor, Seek, SeekFrom},
io::{Cursor, Seek, SeekFrom},
os::unix::net::UnixStream,
pin::Pin,
sync::Arc,
thread,
};

use aesm_client::AesmClient;
use enclave_runner::{
usercalls::{AsyncListener, AsyncStream, UsercallExtension},
EnclaveBuilder,
};
use enclave_runner::{usercalls::UsercallExtension, EnclaveBuilder};
use kvdb::KeyValueDB;
use sgxs_loaders::isgx::Device;
use tdbe_common::TdbeStartupConfig;
Expand All @@ -27,7 +24,10 @@ use enclave_protocol::{
};
use ra_sp_server::config::SpRaConfig;

use crate::enclave_bridge::TdbeConfig;
use crate::enclave_bridge::{
edp::{UserCallListener, UserCallStream},
TdbeConfig,
};

#[derive(Debug)]
pub struct TdbeApp {
Expand All @@ -42,58 +42,62 @@ pub struct TdbeApp {
remote_rpc_address: Option<String>,
/// Local TDBE server address to listen on. E.g. `127.0.0.1:3445`
local_listen_address: String,
/// UDS to push secrets to tx-validation enclave
tve_stream: UnixStream,
}

impl TdbeApp {
/// Creates a new instance of TDBE app
pub fn new(
tdbe_config: &TdbeConfig,
ra_config: &SpRaConfig,
storage: Arc<dyn KeyValueDB>,
_storage: Arc<dyn KeyValueDB>,
tve_stream: UnixStream,
) -> std::io::Result<Self> {
// - `chain_abci_stream` is passed to enclave. Encalve can send requests to chain-abci
// using this
// - `chain_abci_receiver` listens to the requests sent by enclave and responds to them
let (chain_abci_stream, chain_abci_receiver) = UnixStream::pair()?;
let (chain_abci_stream, _chain_abci_receiver) = UnixStream::pair()?;

// - `persistence_stream` is passed to enclave. Encalve can send requests to chain-storage
// using this
// - `persistence_receiver` listens to the requests sent by enclave and responds to them
let (persistence_stream, persistence_receiver) = UnixStream::pair()?;
let (persistence_stream, _persistence_receiver) = UnixStream::pair()?;

spawn_chain_abci_thread(chain_abci_receiver, storage.clone());
spawn_persistence_thread(persistence_receiver, storage);
// FIXME: spawn these when they actually do something
// spawn_chain_abci_thread(chain_abci_receiver, storage.clone());
// spawn_persistence_thread(persistence_receiver, storage);

Ok(Self {
chain_abci_stream,
persistence_stream,
sp_address: ra_config.address.clone(),
remote_rpc_address: tdbe_config.remote_rpc_address.clone(),
local_listen_address: tdbe_config.local_listen_address.clone(),
tve_stream,
})
}

pub fn spawn(self) {
thread::spawn(move || {
let mut device = Device::new()
.expect("SGX device was not found")
.einittoken_provider(AesmClient::new())
.build();
let mut enclave_builder = EnclaveBuilder::new("tdb-enclave-app.sgxs".as_ref());

enclave_builder
.coresident_signature()
.expect("Enclave signature file not found");
enclave_builder.usercall_extension(self);

let enclave = enclave_builder
.build(&mut device)
.expect("Failed to build enclave");
enclave.run().expect("Failed to start enclave")
});
let mut device = Device::new()
.expect("SGX device was not found")
.einittoken_provider(AesmClient::new())
.build();
let mut enclave_builder = EnclaveBuilder::new("tdb-enclave-app.sgxs".as_ref());

enclave_builder
.coresident_signature()
.expect("Enclave signature file not found");
enclave_builder.usercall_extension(self);

let enclave = enclave_builder
.build(&mut device)
.expect("Failed to build enclave");
enclave.run().expect("Failed to start enclave")
}
}

#[allow(dead_code)]
fn spawn_chain_abci_thread(mut receiver: UnixStream, storage: Arc<dyn KeyValueDB>) {
let _ = thread::spawn(move || {
let storage = chain_storage::ReadOnlyStorage::new_db(storage);
Expand Down Expand Up @@ -127,6 +131,8 @@ fn get_sealed_tx_data(txids: Vec<TxId>, storage: &ReadOnlyStorage) -> Option<Vec
Some(result)
}

/// FIXME: should this start a background thread if this is one-off and the main thread needs to wait for its completion?
#[allow(dead_code)]
fn spawn_persistence_thread(mut receiver: UnixStream, storage: Arc<dyn KeyValueDB>) {
let _ = thread::spawn(move || {
let mut storage = chain_storage::Storage::new_db(storage);
Expand All @@ -151,18 +157,14 @@ fn spawn_persistence_thread(mut receiver: UnixStream, storage: Arc<dyn KeyValueD
});
}

#[allow(clippy::type_complexity)]
impl UsercallExtension for TdbeApp {
fn connect_stream<'future>(
&'future self,
addr: &'future str,
_local_addr: Option<&'future mut String>,
_peer_addr: Option<&'future mut String>,
) -> Pin<Box<dyn Future<Output = io::Result<Option<Box<dyn AsyncStream>>>> + 'future>> {
async fn connect_stream_inner(
this: &TdbeApp,
addr: &str,
) -> io::Result<Option<Box<dyn AsyncStream>>> {
) -> Pin<Box<dyn Future<Output = UserCallStream> + 'future>> {
async fn connect_stream_inner(this: &TdbeApp, addr: &str) -> UserCallStream {
match addr {
// Passes initial startup configuration to enclave
"init" => {
Expand Down Expand Up @@ -192,6 +194,10 @@ impl UsercallExtension for TdbeApp {
let stream = TcpStream::connect(&this.sp_address).await?;
Ok(Some(Box::new(stream)))
}
"tx-validation" => {
let stream = tokio::net::UnixStream::from_std(this.tve_stream.try_clone()?)?;
Ok(Some(Box::new(stream)))
}
_ => Ok(None),
}
}
Expand All @@ -203,11 +209,8 @@ impl UsercallExtension for TdbeApp {
&'future self,
addr: &'future str,
_local_addr: Option<&'future mut String>,
) -> Pin<Box<dyn Future<Output = io::Result<Option<Box<dyn AsyncListener>>>> + 'future>> {
async fn bind_stream_inner(
this: &TdbeApp,
addr: &str,
) -> io::Result<Option<Box<dyn AsyncListener>>> {
) -> Pin<Box<dyn Future<Output = UserCallListener> + 'future>> {
async fn bind_stream_inner(this: &TdbeApp, addr: &str) -> UserCallListener {
match addr {
// Binds TCP listener for TDBE server
"tdbe" => {
Expand Down
Loading

0 comments on commit ed9edb0

Please sign in to comment.